Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Weighted updates for kll items sketch #498

Merged
merged 3 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,5 @@ public int sizeOf(final T[] items) {
* Returns the concrete class of type T
* @return the concrete class of type T
*/
public abstract Class<?> getClassOfT();
public abstract Class<T> getClassOfT();
}
15 changes: 8 additions & 7 deletions src/main/java/org/apache/datasketches/kll/KllDoublesHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@

import org.apache.datasketches.memory.WritableMemory;

//
//
/**
* Static methods to support KllDoublesSketch
* @author Kevin Lang
* @author Alexander Saydakov
*/
//
final class KllDoublesHelper {

/**
Expand All @@ -47,8 +47,8 @@ final class KllDoublesHelper {
* @param weight the given weight
* @return the Items Array.
*/
static double[] createItemsArray(final double item, final int weight) {
final int itemsArrLen = Integer.bitCount(weight);
static double[] createItemsArray(final double item, final long weight) {
final int itemsArrLen = Long.bitCount(weight);
final double[] itemsArr = new double[itemsArrLen];
Arrays.fill(itemsArr, item);
return itemsArr;
Expand Down Expand Up @@ -332,12 +332,13 @@ static void updateDouble(final KllDoublesSketch dblSk, final double item) {
}

//Called from KllDoublesSketch::update with weight
static void updateDouble(final KllDoublesSketch dblSk, final double item, final int weight) {
static void updateDouble(final KllDoublesSketch dblSk, final double item, final long weight) {
if (weight < dblSk.levelsArr[0]) {
for (int i = 0; i < weight; i++) { updateDouble(dblSk, item); }
for (int i = 0; i < (int)weight; i++) { updateDouble(dblSk, item); }
} else {
dblSk.updateMinMax(item);
final KllHeapDoublesSketch tmpSk = new KllHeapDoublesSketch(dblSk.getK(), DEFAULT_M, item, weight);

dblSk.merge(tmpSk);
}
}
Expand Down Expand Up @@ -471,7 +472,7 @@ private static void populateDoubleWorkArrays( //workBuf and workLevels are modif

workLevels[0] = 0;

// Note: the level zero data from "other" was already inserted into "self",
// Note: the level zero data from "other" was already inserted into "self".
// This copies into workbuf.
final int selfPopZero = KllHelper.currentLevelSizeItems(0, myCurNumLevels, myCurLevelsArr);
System.arraycopy(myCurDoubleItemsArr, myCurLevelsArr[0], workBuf, workLevels[0], selfPopZero);
Expand All @@ -481,7 +482,7 @@ private static void populateDoubleWorkArrays( //workBuf and workLevels are modif
final int selfPop = KllHelper.currentLevelSizeItems(lvl, myCurNumLevels, myCurLevelsArr);
final int otherPop = KllHelper.currentLevelSizeItems(lvl, otherNumLevels, otherLevelsArr);
workLevels[lvl + 1] = workLevels[lvl] + selfPop + otherPop;

assert selfPop >= 0 && otherPop >= 0;
if (selfPop == 0 && otherPop == 0) { continue; }
else if (selfPop > 0 && otherPop == 0) {
System.arraycopy(myCurDoubleItemsArr, myCurLevelsArr[lvl], workBuf, workLevels[lvl], selfPop);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,11 +330,11 @@ public void update(final double item) {
* @param item the item to be repeated. NaNs are ignored.
* @param weight the number of times the update of item is to be repeated. It must be &ge; one.
*/
public void update(final double item, final int weight) {
public void update(final double item, final long weight) {
if (Double.isNaN(item)) { return; } //ignore
if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
if (weight < 1) { throw new SketchesArgumentException("Weight is less than one."); }
if (weight == 1) { KllDoublesHelper.updateDouble(this, item); }
if (weight < 1L) { throw new SketchesArgumentException("Weight is less than one."); }
if (weight == 1L) { KllDoublesHelper.updateDouble(this, item); }
else { KllDoublesHelper.updateDouble(this, item, weight); }
kllDoublesSV = null;
}
Expand Down
19 changes: 10 additions & 9 deletions src/main/java/org/apache/datasketches/kll/KllFloatsHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@

import org.apache.datasketches.memory.WritableMemory;

//
//
/**
* Static methods to support KllFloatsSketch
* @author Kevin Lang
* @author Alexander Saydakov
*/
//
final class KllFloatsHelper {

/**
Expand All @@ -47,8 +47,8 @@ final class KllFloatsHelper {
* @param weight the given weight
* @return the Items Array.
*/
static float[] createItemsArray(final float item, final int weight) {
final int itemsArrLen = Integer.bitCount(weight);
static float[] createItemsArray(final float item, final long weight) {
final int itemsArrLen = Long.bitCount(weight);
final float[] itemsArr = new float[itemsArrLen];
Arrays.fill(itemsArr, item);
return itemsArr;
Expand Down Expand Up @@ -332,12 +332,13 @@ static void updateFloat(final KllFloatsSketch fltSk, final float item) {
}

//Called from KllFloatsSketch::update with weight
static void updateFloat(final KllFloatsSketch fltSk, final float item, final int weight) {
static void updateFloat(final KllFloatsSketch fltSk, final float item, final long weight) {
if (weight < fltSk.levelsArr[0]) {
for (int i = 0; i < weight; i++) { updateFloat(fltSk, item); }
for (int i = 0; i < (int)weight; i++) { updateFloat(fltSk, item); }
} else {
fltSk.updateMinMax(item);
final KllHeapFloatsSketch tmpSk = new KllHeapFloatsSketch(fltSk.getK(), DEFAULT_M, item, weight);

fltSk.merge(tmpSk);
}
}
Expand Down Expand Up @@ -471,7 +472,7 @@ private static void populateFloatWorkArrays(

worklevels[0] = 0;

// Note: the level zero data from "other" was already inserted into "self"
// Note: the level zero data from "other" was already inserted into "self".
// This copies into workbuf.
final int selfPopZero = KllHelper.currentLevelSizeItems(0, myCurNumLevels, myCurLevelsArr);
System.arraycopy( myCurFloatItemsArr, myCurLevelsArr[0], workbuf, worklevels[0], selfPopZero);
Expand All @@ -481,14 +482,14 @@ private static void populateFloatWorkArrays(
final int selfPop = KllHelper.currentLevelSizeItems(lvl, myCurNumLevels, myCurLevelsArr);
final int otherPop = KllHelper.currentLevelSizeItems(lvl, otherNumLevels, otherLevelsArr);
worklevels[lvl + 1] = worklevels[lvl] + selfPop + otherPop;

assert selfPop >= 0 && otherPop >= 0;
if (selfPop == 0 && otherPop == 0) { continue; }
if (selfPop > 0 && otherPop == 0) {
System.arraycopy(myCurFloatItemsArr, myCurLevelsArr[lvl], workbuf, worklevels[lvl], selfPop);
}
}
else if (selfPop == 0 && otherPop > 0) {
System.arraycopy(otherFloatItemsArr, otherLevelsArr[lvl], workbuf, worklevels[lvl], otherPop);
}
}
else if (selfPop > 0 && otherPop > 0) {
mergeSortedFloatArrays(
myCurFloatItemsArr, myCurLevelsArr[lvl], selfPop,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,11 +330,11 @@ public void update(final float item) {
* @param item the item to be repeated. NaNs are ignored.
* @param weight the number of times the update of item is to be repeated. It must be &ge; one.
*/
public void update(final float item, final int weight) {
public void update(final float item, final long weight) {
if (Float.isNaN(item)) { return; } //ignore
if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
if (weight < 1) { throw new SketchesArgumentException("Weight is less than one."); }
if (weight == 1) { KllFloatsHelper.updateFloat(this, item); }
if (weight < 1L) { throw new SketchesArgumentException("Weight is less than one."); }
if (weight == 1L) { KllFloatsHelper.updateFloat(this, item); }
else { KllFloatsHelper.updateFloat(this, item, weight); }
kllFloatsSV = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ final class KllHeapDoublesSketch extends KllDoublesSketch {
*
* @param k parameter that controls size of the sketch and accuracy of estimates.
* <em>k</em> can be between <em>m</em> and 65535, inclusive.
* The default <em>k</em> = 200 results in a normalized rank error of about 1.65%.
* Larger <em>k</em> will have smaller error but the sketch will be larger (and slower).
* @param m parameter controls the minimum level width in items. It can be 2, 4, 6 or 8.
* The DEFAULT_M, which is 8 is recommended. Other sizes of <em>m</em> should be considered
* experimental as they have not been as well characterized.
Expand All @@ -84,7 +82,7 @@ final class KllHeapDoublesSketch extends KllDoublesSketch {
/**
* Used for creating a temporary sketch for use with weighted updates.
*/
KllHeapDoublesSketch(final int k, final int m, final double item, final int weight) {
KllHeapDoublesSketch(final int k, final int m, final double item, final long weight) {
super(UPDATABLE);
KllHelper.checkM(m);
KllHelper.checkK(k, m);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ final class KllHeapFloatsSketch extends KllFloatsSketch {
*
* @param k parameter that controls size of the sketch and accuracy of estimates.
* <em>k</em> can be between <em>m</em> and 65535, inclusive.
* The default <em>k</em> = 200 results in a normalized rank error of about 1.65%.
* Larger <em>k</em> will have smaller error but the sketch will be larger (and slower).
* @param m parameter controls the minimum level width in items. It can be 2, 4, 6 or 8.
* The DEFAULT_M, which is 8 is recommended. Other sizes of <em>m</em> should be considered
* experimental as they have not been as well characterized.
Expand All @@ -84,7 +82,7 @@ final class KllHeapFloatsSketch extends KllFloatsSketch {
/**
* Used for creating a temporary sketch for use with weighted updates.
*/
KllHeapFloatsSketch(final int k, final int m, final float item, final int weight) {
KllHeapFloatsSketch(final int k, final int m, final float item, final long weight) {
super(UPDATABLE);
KllHelper.checkM(m);
KllHelper.checkK(k, m);
Expand Down
50 changes: 39 additions & 11 deletions src/main/java/org/apache/datasketches/kll/KllHeapItemsSketch.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.datasketches.kll.KllSketch.SketchStructure.COMPACT_EMPTY;
import static org.apache.datasketches.kll.KllSketch.SketchStructure.COMPACT_FULL;
import static org.apache.datasketches.kll.KllSketch.SketchStructure.COMPACT_SINGLE;
import static org.apache.datasketches.kll.KllSketch.SketchStructure.UPDATABLE;

import java.lang.reflect.Array;
import java.util.Comparator;
Expand All @@ -34,6 +35,14 @@
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;

/**
* This class implements an on-heap doubles KllSketch.
*
* <p>Please refer to the documentation in the package-info:<br>
* {@link org.apache.datasketches.kll}</p>
*
* @author Lee Rhodes, Kevin Lang
*/
@SuppressWarnings("unchecked")
final class KllHeapItemsSketch<T> extends KllItemsSketch<T> {
private final int k; // configured size of K.
Expand All @@ -46,14 +55,17 @@ final class KllHeapItemsSketch<T> extends KllItemsSketch<T> {
private Object[] itemsArr;

/**
* Constructs a new empty instance of this sketch on the Java heap.
* New instance heap constructor.
* @param k parameter that controls size of the sketch and accuracy of estimates.
* @param m parameter controls the minimum level width in items. It can be 2, 4, 6 or 8.
* The DEFAULT_M, which is 8 is recommended. Other sizes of <em>m</em> should be considered
* experimental as they have not been as well characterized.
* @param comparator user specified comparator of type T.
* @param serDe serialization / deserialization class
*/
KllHeapItemsSketch(
final int k,
final int m,
final Comparator<? super T> comparator,
KllHeapItemsSketch(final int k, final int m, final Comparator<? super T> comparator,
final ArrayOfItemsSerDe<T> serDe) {
super(SketchStructure.UPDATABLE, comparator, serDe);
super(UPDATABLE, comparator, serDe);
KllHelper.checkM(m);
KllHelper.checkK(k, m);
this.levelsArr = new int[] {k, k};
Expand All @@ -69,11 +81,27 @@ final class KllHeapItemsSketch<T> extends KllItemsSketch<T> {
}

/**
* The Heapify constructor, which constructs an image of this sketch from
* a Memory (or WritableMemory) object that was created by this sketch
* and has a type T consistent with the given comparator and serDe.
* Once the data from the given Memory has been transferred into this heap sketch,
* the reference to the Memory object is no longer retained.
* Used for creating a temporary sketch for use with weighted updates.
*/
KllHeapItemsSketch(final int k, final int m, final T item, final long weight, final Comparator<? super T> comparator,
final ArrayOfItemsSerDe<T> serDe) {
super(UPDATABLE, comparator, serDe);
KllHelper.checkM(m);
KllHelper.checkK(k, m);
this.levelsArr = KllHelper.createLevelsArray(weight);
this.readOnly = false;
this.k = k;
this.m = m;
this.n = weight;
this.minK = k;
this.isLevelZeroSorted = false;
this.minItem = item;
this.maxItem = item;
this.itemsArr = KllItemsHelper.createItemsArray(serDe.getClassOfT(), item, weight);
}

/**
* The Heapify constructor
* @param srcMem the Source Memory image that contains data.
* @param comparator the comparator for this sketch and given Memory.
* @param serDe the serializer / deserializer for this sketch and the given Memory.
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/org/apache/datasketches/kll/KllHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,11 @@ public static long convertToCumulative(final long[] array) {
* @param weight the given weight
* @return the Levels Array
*/
static int[] createLevelsArray(final int weight) {
final int numLevels = 32 - Integer.numberOfLeadingZeros(weight);
static int[] createLevelsArray(final long weight) {
final int numLevels = 64 - Long.numberOfLeadingZeros(weight);
if (numLevels > 61) {
throw new SketchesArgumentException("The requested weight must not exceed 2^61");
}
final int[] levelsArr = new int[numLevels + 1]; //always one more than numLevels
int itemsArrIndex = 0;
levelsArr[0] = itemsArrIndex;
Expand Down
Loading
Loading