Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip SortingDigest when merging a large digest in HybridDigest #97099

Merged
merged 2 commits into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Skip SortingDigest when merging a large digest in HybridDigest.
This is a small performance optimization that avoids creating an
intermediate SortingDigest when merging a digest tracking many samples.
The current behavior is to keep adding values to SortingDigest until we
cross the threshold for switching to MergingDigest, at which point we
copy all values from SortingDigest to MergingDigest and release the
former.

As a side cleanup, remove the methods for adding a list of digests. It's
not used anywhere and it can be tricky to get right - the current
implementation for HybridDigest is buggy.
  • Loading branch information
kkrik-es committed Jun 26, 2023
commit 6817f2c281aadb789bd7040c860b5cb401017c2a
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;

import static org.elasticsearch.tdigest.IntAVLTree.NIL;
Expand Down Expand Up @@ -68,16 +67,6 @@ public int centroidCount() {
return summary.size();
}

@Override
public void add(List<? extends TDigest> others) {
for (TDigest other : others) {
setMinMax(Math.min(min, other.getMin()), Math.max(max, other.getMax()));
for (Centroid centroid : other.centroids()) {
add(centroid.mean(), centroid.count());
}
}
}

@Override
public void add(double x, int w) {
checkValue(x);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.tdigest;

import java.util.Collection;
import java.util.List;

/**
* Uses a {@link SortingDigest} implementation under the covers for small sample populations, then switches to {@link MergingDigest}.
Expand Down Expand Up @@ -80,6 +79,16 @@ public void add(double x, int w) {
}
}

@Override
public void add(TDigest other) {
reserve(other.size());
if (mergingDigest != null) {
mergingDigest.add(other);
} else {
sortingDigest.add(other);
}
}

@Override
public void reserve(long size) {
if (mergingDigest != null) {
Expand All @@ -101,15 +110,6 @@ public void reserve(long size) {
}
}

@Override
public void add(List<? extends TDigest> others) {
if (mergingDigest != null) {
mergingDigest.add(others);
} else {
sortingDigest.add(others);
}
}

@Override
public void compress() {
if (mergingDigest != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.AbstractCollection;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;

/**
* Maintains a t-digest by collecting new points in a buffer that is then sorted occasionally and merged
Expand Down Expand Up @@ -234,56 +233,6 @@ public void add(double x, int w) {
}
}

private void add(double[] m, double[] w, int count) {
if (m.length != w.length) {
throw new IllegalArgumentException("Arrays not same length");
}
if (m.length < count + lastUsedCell) {
// make room to add existing centroids
double[] m1 = new double[count + lastUsedCell];
System.arraycopy(m, 0, m1, 0, count);
m = m1;
double[] w1 = new double[count + lastUsedCell];
System.arraycopy(w, 0, w1, 0, count);
w = w1;
}
double total = 0;
for (int i = 0; i < count; i++) {
total += w[i];
}
merge(m, w, count, null, total, false, compression);
}

@Override
public void add(List<? extends TDigest> others) {
if (others.size() == 0) {
return;
}
int size = 0;
for (TDigest other : others) {
other.compress();
size += other.centroidCount();
}

double[] m = new double[size];
double[] w = new double[size];
int offset = 0;
for (TDigest other : others) {
if (other instanceof MergingDigest md) {
System.arraycopy(md.mean, 0, m, offset, md.lastUsedCell);
System.arraycopy(md.weight, 0, w, offset, md.lastUsedCell);
offset += md.lastUsedCell;
} else {
for (Centroid centroid : other.centroids()) {
m[offset] = centroid.mean();
w[offset] = centroid.count();
offset++;
}
}
}
add(m, w, size);
}

private void mergeNewValues() {
mergeNewValues(compression);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

/**
* Simple implementation of the TDigest interface that stores internally and sorts all samples to calculate quantiles and CDFs.
Expand All @@ -50,21 +49,6 @@ public void add(double x, int w) {
min = Math.min(min, x);
}

@Override
public void add(List<? extends TDigest> others) {
long valuesToAddCount = 0;
for (TDigest other : others) {
valuesToAddCount += other.size();
}
reserve(valuesToAddCount);

for (TDigest other : others) {
for (Centroid centroid : other.centroids()) {
add(centroid.mean(), centroid.count());
}
}
}

@Override
public void compress() {
if (isSorted == false) {
Expand Down
11 changes: 0 additions & 11 deletions libs/tdigest/src/main/java/org/elasticsearch/tdigest/TDigest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
package org.elasticsearch.tdigest;

import java.util.Collection;
import java.util.List;
import java.util.Locale;

/**
Expand Down Expand Up @@ -112,8 +111,6 @@ final void checkValue(double x) {
}
}

public abstract void add(List<? extends TDigest> others);

/**
* Re-examines a t-digest to determine whether some centroids are redundant. If your data are
* perversely ordered, this may be a good idea. Even if not, this may save 20% or so in space.
Expand Down Expand Up @@ -202,12 +199,4 @@ public double getMin() {
public double getMax() {
return max;
}

/**
* Override the min and max values for testing purposes
*/
void setMinMax(double min, double max) {
this.min = min;
this.max = max;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ public void testNanDueToBadInitialization() {

// Merge all mds one at a time into md.
for (int i = 0; i < M; ++i) {
List<MergingDigest> singleton = new ArrayList<>();
singleton.add(mds.get(i));
md.add(singleton);
md.add(mds.get(i));
}
Assert.assertFalse(Double.isNaN(md.quantile(0.01)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@

package org.elasticsearch.search.aggregations.metrics;

import org.elasticsearch.tdigest.TDigest;

import java.util.List;

public final class EmptyTDigestState extends TDigestState {
public EmptyTDigestState() {
// Use the sorting implementation to minimize memory allocation.
Expand All @@ -28,16 +24,6 @@ public void add(double x) {
throw new UnsupportedOperationException("Immutable Empty TDigest");
}

@Override
public void add(List<? extends TDigestState> others) {
throw new UnsupportedOperationException("Immutable Empty TDigest");
}

@Override
public void add(TDigest other) {
throw new UnsupportedOperationException("Immutable Empty TDigest");
}

@Override
public void add(TDigestState other) {
throw new UnsupportedOperationException("Immutable Empty TDigest");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
import org.elasticsearch.tdigest.TDigest;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;

/**
* Decorates {@link org.elasticsearch.tdigest.TDigest} with custom serialization. The underlying implementation for TDigest is selected
Expand Down Expand Up @@ -215,18 +213,6 @@ public void add(double x) {
tdigest.add(x, 1);
}

public void add(List<? extends TDigestState> others) {
List<TDigest> otherTdigests = new ArrayList<>();
for (TDigestState other : others) {
otherTdigests.add(other.tdigest);
}
tdigest.add(otherTdigests);
}

public void add(TDigest other) {
tdigest.add(other);
}

public final void compress() {
tdigest.compress();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@

import org.elasticsearch.test.ESTestCase;

import java.util.List;

public class EmptyTDigestStateTests extends ESTestCase {

private static final TDigestState singleton = new EmptyTDigestState();
Expand All @@ -31,8 +29,4 @@ public void testTestAddWithWeight() {
public void testTestAddList() {
expectThrows(UnsupportedOperationException.class, () -> singleton.add(randomDouble(), randomInt(10)));
}

public void testTestAddListTDigest() {
expectThrows(UnsupportedOperationException.class, () -> singleton.add(List.of(new EmptyTDigestState(), new EmptyTDigestState())));
}
}