Skip to content

Commit

Permalink
Merge pull request #498 from apache/weighted_updates_for_kll_items_sk…
Browse files Browse the repository at this point in the history
…etch

Weighted updates for kll items sketch
  • Loading branch information
leerho authored Jan 26, 2024
2 parents b078525 + 44bb7d4 commit ff07407
Show file tree
Hide file tree
Showing 15 changed files with 278 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,5 @@ public int sizeOf(final T[] items) {
* Returns the concrete class of type T
* @return the concrete class of type T
*/
public abstract Class<?> getClassOfT();
public abstract Class<T> getClassOfT();
}
15 changes: 8 additions & 7 deletions src/main/java/org/apache/datasketches/kll/KllDoublesHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@

import org.apache.datasketches.memory.WritableMemory;

//
//
/**
* Static methods to support KllDoublesSketch
* @author Kevin Lang
* @author Alexander Saydakov
*/
//
final class KllDoublesHelper {

/**
Expand All @@ -47,8 +47,8 @@ final class KllDoublesHelper {
* @param weight the given weight
* @return the Items Array.
*/
static double[] createItemsArray(final double item, final int weight) {
final int itemsArrLen = Integer.bitCount(weight);
static double[] createItemsArray(final double item, final long weight) {
final int itemsArrLen = Long.bitCount(weight);
final double[] itemsArr = new double[itemsArrLen];
Arrays.fill(itemsArr, item);
return itemsArr;
Expand Down Expand Up @@ -332,12 +332,13 @@ static void updateDouble(final KllDoublesSketch dblSk, final double item) {
}

//Called from KllDoublesSketch::update with weight
static void updateDouble(final KllDoublesSketch dblSk, final double item, final int weight) {
static void updateDouble(final KllDoublesSketch dblSk, final double item, final long weight) {
if (weight < dblSk.levelsArr[0]) {
for (int i = 0; i < weight; i++) { updateDouble(dblSk, item); }
for (int i = 0; i < (int)weight; i++) { updateDouble(dblSk, item); }
} else {
dblSk.updateMinMax(item);
final KllHeapDoublesSketch tmpSk = new KllHeapDoublesSketch(dblSk.getK(), DEFAULT_M, item, weight);

dblSk.merge(tmpSk);
}
}
Expand Down Expand Up @@ -471,7 +472,7 @@ private static void populateDoubleWorkArrays( //workBuf and workLevels are modif

workLevels[0] = 0;

// Note: the level zero data from "other" was already inserted into "self",
// Note: the level zero data from "other" was already inserted into "self".
// This copies into workbuf.
final int selfPopZero = KllHelper.currentLevelSizeItems(0, myCurNumLevels, myCurLevelsArr);
System.arraycopy(myCurDoubleItemsArr, myCurLevelsArr[0], workBuf, workLevels[0], selfPopZero);
Expand All @@ -481,7 +482,7 @@ private static void populateDoubleWorkArrays( //workBuf and workLevels are modif
final int selfPop = KllHelper.currentLevelSizeItems(lvl, myCurNumLevels, myCurLevelsArr);
final int otherPop = KllHelper.currentLevelSizeItems(lvl, otherNumLevels, otherLevelsArr);
workLevels[lvl + 1] = workLevels[lvl] + selfPop + otherPop;

assert selfPop >= 0 && otherPop >= 0;
if (selfPop == 0 && otherPop == 0) { continue; }
else if (selfPop > 0 && otherPop == 0) {
System.arraycopy(myCurDoubleItemsArr, myCurLevelsArr[lvl], workBuf, workLevels[lvl], selfPop);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,11 +330,11 @@ public void update(final double item) {
* @param item the item to be repeated. NaNs are ignored.
* @param weight the number of times the update of item is to be repeated. It must be &ge; one.
*/
public void update(final double item, final int weight) {
public void update(final double item, final long weight) {
if (Double.isNaN(item)) { return; } //ignore
if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
if (weight < 1) { throw new SketchesArgumentException("Weight is less than one."); }
if (weight == 1) { KllDoublesHelper.updateDouble(this, item); }
if (weight < 1L) { throw new SketchesArgumentException("Weight is less than one."); }
if (weight == 1L) { KllDoublesHelper.updateDouble(this, item); }
else { KllDoublesHelper.updateDouble(this, item, weight); }
kllDoublesSV = null;
}
Expand Down
19 changes: 10 additions & 9 deletions src/main/java/org/apache/datasketches/kll/KllFloatsHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@

import org.apache.datasketches.memory.WritableMemory;

//
//
/**
* Static methods to support KllFloatsSketch
* @author Kevin Lang
* @author Alexander Saydakov
*/
//
final class KllFloatsHelper {

/**
Expand All @@ -47,8 +47,8 @@ final class KllFloatsHelper {
* @param weight the given weight
* @return the Items Array.
*/
static float[] createItemsArray(final float item, final int weight) {
final int itemsArrLen = Integer.bitCount(weight);
static float[] createItemsArray(final float item, final long weight) {
final int itemsArrLen = Long.bitCount(weight);
final float[] itemsArr = new float[itemsArrLen];
Arrays.fill(itemsArr, item);
return itemsArr;
Expand Down Expand Up @@ -332,12 +332,13 @@ static void updateFloat(final KllFloatsSketch fltSk, final float item) {
}

//Called from KllFloatsSketch::update with weight
static void updateFloat(final KllFloatsSketch fltSk, final float item, final int weight) {
static void updateFloat(final KllFloatsSketch fltSk, final float item, final long weight) {
if (weight < fltSk.levelsArr[0]) {
for (int i = 0; i < weight; i++) { updateFloat(fltSk, item); }
for (int i = 0; i < (int)weight; i++) { updateFloat(fltSk, item); }
} else {
fltSk.updateMinMax(item);
final KllHeapFloatsSketch tmpSk = new KllHeapFloatsSketch(fltSk.getK(), DEFAULT_M, item, weight);

fltSk.merge(tmpSk);
}
}
Expand Down Expand Up @@ -471,7 +472,7 @@ private static void populateFloatWorkArrays(

worklevels[0] = 0;

// Note: the level zero data from "other" was already inserted into "self"
// Note: the level zero data from "other" was already inserted into "self".
// This copies into workbuf.
final int selfPopZero = KllHelper.currentLevelSizeItems(0, myCurNumLevels, myCurLevelsArr);
System.arraycopy( myCurFloatItemsArr, myCurLevelsArr[0], workbuf, worklevels[0], selfPopZero);
Expand All @@ -481,14 +482,14 @@ private static void populateFloatWorkArrays(
final int selfPop = KllHelper.currentLevelSizeItems(lvl, myCurNumLevels, myCurLevelsArr);
final int otherPop = KllHelper.currentLevelSizeItems(lvl, otherNumLevels, otherLevelsArr);
worklevels[lvl + 1] = worklevels[lvl] + selfPop + otherPop;

assert selfPop >= 0 && otherPop >= 0;
if (selfPop == 0 && otherPop == 0) { continue; }
if (selfPop > 0 && otherPop == 0) {
System.arraycopy(myCurFloatItemsArr, myCurLevelsArr[lvl], workbuf, worklevels[lvl], selfPop);
}
}
else if (selfPop == 0 && otherPop > 0) {
System.arraycopy(otherFloatItemsArr, otherLevelsArr[lvl], workbuf, worklevels[lvl], otherPop);
}
}
else if (selfPop > 0 && otherPop > 0) {
mergeSortedFloatArrays(
myCurFloatItemsArr, myCurLevelsArr[lvl], selfPop,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,11 +330,11 @@ public void update(final float item) {
* @param item the item to be repeated. NaNs are ignored.
* @param weight the number of times the update of item is to be repeated. It must be &ge; one.
*/
public void update(final float item, final int weight) {
public void update(final float item, final long weight) {
if (Float.isNaN(item)) { return; } //ignore
if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
if (weight < 1) { throw new SketchesArgumentException("Weight is less than one."); }
if (weight == 1) { KllFloatsHelper.updateFloat(this, item); }
if (weight < 1L) { throw new SketchesArgumentException("Weight is less than one."); }
if (weight == 1L) { KllFloatsHelper.updateFloat(this, item); }
else { KllFloatsHelper.updateFloat(this, item, weight); }
kllFloatsSV = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ final class KllHeapDoublesSketch extends KllDoublesSketch {
*
* @param k parameter that controls size of the sketch and accuracy of estimates.
* <em>k</em> can be between <em>m</em> and 65535, inclusive.
* The default <em>k</em> = 200 results in a normalized rank error of about 1.65%.
* Larger <em>k</em> will have smaller error but the sketch will be larger (and slower).
* @param m parameter controls the minimum level width in items. It can be 2, 4, 6 or 8.
* The DEFAULT_M, which is 8 is recommended. Other sizes of <em>m</em> should be considered
* experimental as they have not been as well characterized.
Expand All @@ -84,7 +82,7 @@ final class KllHeapDoublesSketch extends KllDoublesSketch {
/**
* Used for creating a temporary sketch for use with weighted updates.
*/
KllHeapDoublesSketch(final int k, final int m, final double item, final int weight) {
KllHeapDoublesSketch(final int k, final int m, final double item, final long weight) {
super(UPDATABLE);
KllHelper.checkM(m);
KllHelper.checkK(k, m);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ final class KllHeapFloatsSketch extends KllFloatsSketch {
*
* @param k parameter that controls size of the sketch and accuracy of estimates.
* <em>k</em> can be between <em>m</em> and 65535, inclusive.
* The default <em>k</em> = 200 results in a normalized rank error of about 1.65%.
* Larger <em>k</em> will have smaller error but the sketch will be larger (and slower).
* @param m parameter controls the minimum level width in items. It can be 2, 4, 6 or 8.
* The DEFAULT_M, which is 8 is recommended. Other sizes of <em>m</em> should be considered
* experimental as they have not been as well characterized.
Expand All @@ -84,7 +82,7 @@ final class KllHeapFloatsSketch extends KllFloatsSketch {
/**
* Used for creating a temporary sketch for use with weighted updates.
*/
KllHeapFloatsSketch(final int k, final int m, final float item, final int weight) {
KllHeapFloatsSketch(final int k, final int m, final float item, final long weight) {
super(UPDATABLE);
KllHelper.checkM(m);
KllHelper.checkK(k, m);
Expand Down
50 changes: 39 additions & 11 deletions src/main/java/org/apache/datasketches/kll/KllHeapItemsSketch.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.datasketches.kll.KllSketch.SketchStructure.COMPACT_EMPTY;
import static org.apache.datasketches.kll.KllSketch.SketchStructure.COMPACT_FULL;
import static org.apache.datasketches.kll.KllSketch.SketchStructure.COMPACT_SINGLE;
import static org.apache.datasketches.kll.KllSketch.SketchStructure.UPDATABLE;

import java.lang.reflect.Array;
import java.util.Comparator;
Expand All @@ -34,6 +35,14 @@
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;

/**
* This class implements an on-heap doubles KllSketch.
*
* <p>Please refer to the documentation in the package-info:<br>
* {@link org.apache.datasketches.kll}</p>
*
* @author Lee Rhodes, Kevin Lang
*/
@SuppressWarnings("unchecked")
final class KllHeapItemsSketch<T> extends KllItemsSketch<T> {
private final int k; // configured size of K.
Expand All @@ -46,14 +55,17 @@ final class KllHeapItemsSketch<T> extends KllItemsSketch<T> {
private Object[] itemsArr;

/**
* Constructs a new empty instance of this sketch on the Java heap.
* New instance heap constructor.
* @param k parameter that controls size of the sketch and accuracy of estimates.
* @param m parameter controls the minimum level width in items. It can be 2, 4, 6 or 8.
* The DEFAULT_M, which is 8 is recommended. Other sizes of <em>m</em> should be considered
* experimental as they have not been as well characterized.
* @param comparator user specified comparator of type T.
* @param serDe serialization / deserialization class
*/
KllHeapItemsSketch(
final int k,
final int m,
final Comparator<? super T> comparator,
KllHeapItemsSketch(final int k, final int m, final Comparator<? super T> comparator,
final ArrayOfItemsSerDe<T> serDe) {
super(SketchStructure.UPDATABLE, comparator, serDe);
super(UPDATABLE, comparator, serDe);
KllHelper.checkM(m);
KllHelper.checkK(k, m);
this.levelsArr = new int[] {k, k};
Expand All @@ -69,11 +81,27 @@ final class KllHeapItemsSketch<T> extends KllItemsSketch<T> {
}

/**
* The Heapify constructor, which constructs an image of this sketch from
* a Memory (or WritableMemory) object that was created by this sketch
* and has a type T consistent with the given comparator and serDe.
* Once the data from the given Memory has been transferred into this heap sketch,
* the reference to the Memory object is no longer retained.
* Used for creating a temporary sketch for use with weighted updates.
*/
KllHeapItemsSketch(final int k, final int m, final T item, final long weight, final Comparator<? super T> comparator,
final ArrayOfItemsSerDe<T> serDe) {
super(UPDATABLE, comparator, serDe);
KllHelper.checkM(m);
KllHelper.checkK(k, m);
this.levelsArr = KllHelper.createLevelsArray(weight);
this.readOnly = false;
this.k = k;
this.m = m;
this.n = weight;
this.minK = k;
this.isLevelZeroSorted = false;
this.minItem = item;
this.maxItem = item;
this.itemsArr = KllItemsHelper.createItemsArray(serDe.getClassOfT(), item, weight);
}

/**
* The Heapify constructor
* @param srcMem the Source Memory image that contains data.
* @param comparator the comparator for this sketch and given Memory.
* @param serDe the serializer / deserializer for this sketch and the given Memory.
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/org/apache/datasketches/kll/KllHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,11 @@ public static long convertToCumulative(final long[] array) {
* @param weight the given weight
* @return the Levels Array
*/
static int[] createLevelsArray(final int weight) {
final int numLevels = 32 - Integer.numberOfLeadingZeros(weight);
static int[] createLevelsArray(final long weight) {
final int numLevels = 64 - Long.numberOfLeadingZeros(weight);
if (numLevels > 61) {
throw new SketchesArgumentException("The requested weight must not exceed 2^61");
}
final int[] levelsArr = new int[numLevels + 1]; //always one more than numLevels
int itemsArrIndex = 0;
levelsArr[0] = itemsArrIndex;
Expand Down
Loading

0 comments on commit ff07407

Please sign in to comment.