Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Theta Sketch Aggregation Enhancements #12042

Merged
merged 14 commits into from
Dec 5, 2023

Conversation

davecromberge
Copy link
Member

@davecromberge davecromberge commented Nov 22, 2023

Introduces additional parameters to the DistinctCountThetaSketch aggregation function that give the end-user more control over how sketches are aggregated at query time. The defaults are selected to ensure that the behaviour remains unchanged over the current implementation.

Furthermore, an accumulator custom object is added to ensure that pairwise union operations are avoided as much as possible. Instead, sketches can be aggregated and merged when a threshold is met.

This PR is a performance enhancement and can be tagged/labelled as such.

release-notes:

  • add configuration options for DistinctCountThetaSketchAggregationFunction
  • respect ordering for existing Theta sketches to use "early-stop" optimisations for unions.

Introduces additional parameters to the DistinctCountThetaSketch aggregation function that
give the end-user more control over how sketches are merged.  The defaults are selected
to ensure that the behaviour remains unchanged over the current implementation.

Furthermore, an accumulator custom object is added to ensure that pairwise union
operations are avoided as much as possible.  Instead, sketches can be aggregated
and merged when a threshold is met.
@davecromberge
Copy link
Member Author

davecromberge commented Nov 22, 2023

Notes for reviewers:

The changes in this PR introduce a different intermediate result type for the DistinctCountThetaSketch aggregation function. The reason for doing so is to eliminate the intermediate bookkeeping necessary to perform unions on two sketches at a time.
Moreover, the "early-stop" optimisation in set operation circumvents further processing when retained items fall above the minimum theta value. This applies to other post set operation expressions as well.

When using JMH benchmarks to simulate this scenario, the speedup achieved by accumulating sketches prior to union is often an improvement by a factor of 3.
However, I have not been able to observe this speedup in actual test queries due to limitations in my test environment. Instead, this will become more important when sketches require merging from thousands of segments on a single server.

Reference:
apache/datasketches-java#326 (comment)
https://datasketches.apache.org/docs/Theta/ThetaSize.html

Alternatives considered

  1. Only introducing additional parameters - this avoids the complexity in accumulating intermediate sketches and still gives the end user the option to use p-sampling.

  2. Using Union as the intermediate type. The Datasketches library exposes the ability to serialize a union as bytes but does not expose the capability to use fastWrap which deserialises the union back into an object on the heap. This would be simpler than the approach in this PR and if desired I can work with the Datasketches team to add this capability.

@codecov-commenter
Copy link

codecov-commenter commented Nov 22, 2023

Codecov Report

Attention: 32 lines in your changes are missing coverage. Please review.

Comparison is base (d7c76b9) 61.62% compared to head (bebf993) 61.66%.
Report is 10 commits behind head on master.

Files Patch % Lines
...n/DistinctCountThetaSketchAggregationFunction.java 81.39% 15 Missing and 1 partial ⚠️
...ent/local/customobject/ThetaSketchAccumulator.java 71.73% 10 Missing and 3 partials ⚠️
...org/apache/pinot/core/common/ObjectSerDeUtils.java 83.33% 1 Missing and 2 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #12042      +/-   ##
============================================
+ Coverage     61.62%   61.66%   +0.03%     
  Complexity     1152     1152              
============================================
  Files          2389     2390       +1     
  Lines        129824   129954     +130     
  Branches      20083    20100      +17     
============================================
+ Hits          80009    80136     +127     
+ Misses        43989    43979      -10     
- Partials       5826     5839      +13     
Flag Coverage Δ
custom-integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration <0.01% <0.00%> (-0.01%) ⬇️
integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration2 0.00% <0.00%> (ø)
java-11 61.60% <80.60%> (+<0.01%) ⬆️
java-21 61.52% <80.60%> (+0.03%) ⬆️
skip-bytebuffers-false 61.63% <80.60%> (+0.01%) ⬆️
skip-bytebuffers-true 61.50% <80.60%> (+0.03%) ⬆️
temurin 61.66% <80.60%> (+0.03%) ⬆️
unittests 61.66% <80.60%> (+0.03%) ⬆️
unittests1 46.90% <58.78%> (+<0.01%) ⬆️
unittests2 27.64% <21.81%> (+0.02%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

This better aligns to the default nominal values parameter
that is used in the query aggregation function.
@davecromberge davecromberge marked this pull request as draft November 24, 2023 11:22
@davecromberge
Copy link
Member Author

davecromberge commented Nov 24, 2023

I'm converting this to a draft in order to gather feedback on the approach. I have explored point 2 in the alternatives section above which might lead to a simpler implementation - see commit 2ed38f3.

Whilst being somewhat simpler, performance can degrade during the "merge" operation where two UnionWraps behave in pairwise fashion. For this reason, I've reverted back to my original approach.

Removes intermediate array list to buffer/accumulate sketch
elements.  Instead, inputs are fed directly to the underlying
union.  This ensures that the memory usage of the merge is
kept under control.
Copy link
Contributor

@swaminathanmanish swaminathanmanish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed description and the reasoning behind this approach. Intuitively this approach make sense. I took a first pass of the PR and have some high level questions

  1. There are 4 new params introduced. Have we quantified the gains for each of these params and which one yields the largest gains? Im assuming these params work independent of each other.
  2. We have nominal entries param for a sketch (which is the number of entries in a sketch?). Curious if we have already experimented tuning this param to figure out the gains ? How can this parameter impact performance.
  3. Could you share the benchmark results/numbers for different values of the params

}

// Performance optimisation: ensure that the minimum Theta is used for "early-stop".
_accumulator.sort(Comparator.comparingDouble(Sketch::getTheta));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimization here is to stop the sort based on theta threshold ? Just trying to understand how early stop takes effect here.

Copy link
Member Author

@davecromberge davecromberge Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internally, an compact and ordered Theta sketch can be compared to a sorted array of K items, where K is often in the order of thousands.

To merge several sketches, the union operator will retain the items from each array which then need to be de-duplicated and stored where the item is less than the threshold, Theta. If the Theta is chosen to be the minimum of those arrays, it means that it only has to process up to Min Theta for each sketch in the merge, thus reducing de-duplication costs and potentially thousands of hash collision checks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also have the question as I don't see the code for early terminating the merge. I would assume that this is happening within Union or in some sketch side code the sorted property of sketches will trigger the early stop?

I think that it will be great if we can capture the above comment to the comment in the code for providing more insights on this optimization.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - see 90d4562.

// optimisations on unions.
boolean shouldCompact = !value.isCompact();
boolean shouldOrder = value.isOrdered();
if (shouldCompact) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume shouldCompact defaults to true?

Copy link
Member Author

@davecromberge davecromberge Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No - this is not the case and it depends on how the ingested sketches have been processed by the end user. It might be the case that this is true for the empty and singleton sketch (sketch of a single hashed item), but not for others.
The emphasis on these changes is that the serialiser does not alter the current ordering on existing sketches. There is a small performance optimisation to this in that there is no need to re-compact or re-order sketches that are already in that state.

_accumulator.add(sketch);
_numInputs += 1;

if (_accumulator.size() >= _threshold) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main optimization here is to batch the sketch unions instead of doing many unions, to amortize the sort-merge cost, right? And thats where we tune this threshold parameter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, correct. The early stop mechanism and internal book-keeping structure (a QuickSelect updateable sketch on heap) need only be created for a single merge. The penalty that the end user pays for this amonisation is that the merge can consume more memory.

_accumulator.add(sketch);
_numInputs += 1;

if (_accumulator.size() >= _threshold) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a final catch-all to do the union (last batch), even if the threshold is not met? I guess it happens via

public Sketch getResult() {
return unionAll();
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right - the result is extracted when the server returns a result to the broker or when the broker performs the final post aggregation operations.

accumulator.setOrdered(_intermediateOrdering);
accumulator.setThreshold(_accumulatorThreshold);
accumulator.setSetOperationBuilder(_setOperationBuilder);
mergedSketches.add(accumulator.getResult());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen in the default case ? I assume it would just do union a for every sketch pair, instead of the intermediate batching (default threshold = 2).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the default behaviour - I believe that this matches the existing implementation.

@@ -261,6 +261,11 @@ public static Sketch thetaSketchDiff(Object sketchObjectA, Object sketchObjectB)
return diff.getResult(false, null, false);
}

@ScalarFunction(names = {"thetaSketchToString", "theta_sketch_to_string"})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these new functions added to take in the new parameters?

Copy link
Member Author

@davecromberge davecromberge Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, these are useful for debugging purposes. They can be used to inspect the preamble flags for sorting and compaction for sketches in storage, or, for sketches that are returned by the aggregation function.

int nominalEntries = parameters.getNominalEntries();
_updateSketchBuilder.setNominalEntries(nominalEntries);
_setOperationBuilder.setNominalEntries(nominalEntries);
// Sampling probability sets the initial value of Theta, defaults to 1.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this sampling to skip entries in a sketch, while doing the sort/merge?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sampling probability is useful to end users who wish to build sketches on the fly from raw values or from existing sketches. These users might wish to trim down the size of degenerate sketches (sketches that are below capacity). There are certain use cases (such as ordering by top ranked items) where the user might not care if the tail of the list is negatively impacted in terms of error.

_setOperationBuilder.setP(p);
_updateSketchBuilder.setP(p);
// Resize factor controls the size multiple that affects how fast the internal cache grows
ResizeFactor rf = parameters.getResizeFactor();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious how this is tuned and impacts performance ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The resize factor trades off memory consumption for performance. Sometimes, the internal hash table of the sketch reaches capacity and must be resized which is an expensive operation. All existing keys need to rehashed.
Using a different resize factor can control how large the initial hash table is on construction, thereby trading off the cost of a resize operation with potential over-allocation of memory on heap.

@davecromberge
Copy link
Member Author

Thanks for the detailed description and the reasoning behind this approach. Intuitively this approach make sense. I took a first pass of the PR and have some high level questions

  1. There are 4 new params introduced. Have we quantified the gains for each of these params and which one yields the largest gains? Im assuming these params work independent of each other.
  2. We have nominal entries param for a sketch (which is the number of entries in a sketch?). Curious if we have already experimented tuning this param to figure out the gains ? How can this parameter impact performance.
  3. Could you share the benchmark results/numbers for different values of the params

Thank you for your review @swaminathanmanish. I found your questions very insightful. I should make it clear at the outset that it has been difficult to thoroughly benchmark the work in this pull request in a test environment. Some of the performance improvements are knobs to turn and are speculative in how they could behave.

Therefore I would like to propose that the parameters are retained and tested in a production environment, and then subsequently removed should they show no user benefit. The downside to this approach is that backward compatibility will not be maintained should end users start to depend on them and use them. As for your questions, answers follow inline.

  1. There are 4 new params introduced. Have we quantified the gains for each of these params and which one yields the largest gains? Im assuming these params work independent of each other.

The largest gain can be realised through adjusting the sampling probability parameter. However, this is highly dependent on the use case and should only be used in certain circumstances. All parameters are independent of each other and have been selected to have default behaviour that retains the existing behaviour of the system. The performance gains measured in testing are between 25% and 50% performance improvement. Where the results are sampled, the speed increases by 300%.

  1. We have nominal entries param for a sketch (which is the number of entries in a sketch?). Curious if we have already experimented tuning this param to figure out the gains ? How can this parameter impact performance.

We have been using this extensively, but it does not help where there is a large tail of sketches that have retained items less than nominal entries. For these cases, selecting lower nominal entries can impact the error across the board. Instead, using sampling probability allows the end user to curtail size and increase error on the long tail, which, for some reports and queries, is a good tradeoff.

  1. Could you share the benchmark results/numbers for different values of the params

I'd like to do more testing in a production environment. But, the default set of parameters can be up to 25% faster than the current implementation (through sorting and retaining unions). However, adjusting others such as sampling has shown orders of magnitude gains - but, there are certain tradeoffs on accuracy that need to be considered.

@davecromberge davecromberge marked this pull request as ready for review November 28, 2023 22:21
Copy link
Contributor

@snleee snleee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM otherwise

@@ -261,6 +261,11 @@ public static Sketch thetaSketchDiff(Object sketchObjectA, Object sketchObjectB)
return diff.getResult(false, null, false);
}

@ScalarFunction(names = {"thetaSketchToString", "theta_sketch_to_string"})
public static String thetaSketchToString(Object sketchObject) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we return for this function? Can we add some examples for commit notes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function returns a string which is a summary of the current sketch state. Note, the function name is consistent with the same name as that in Druid. See this link.

Examples include:

### HeapCompactOrderedSketch SUMMARY: 
   Estimate                : 149586.73149344584
   Upper Bound, 95% conf   : 154287.5017892762
   Lower Bound, 95% conf   : 145028.6046846571
   Theta (double)          : 0.027382107751846067
   Theta (long)            : 252555366948521403
   Theta (long) hex        : 038141c4a515c5bb
   EstMode?                : true
   Empty?                  : false
   Array Size Entries      : 4096
   Retained Entries        : 4096
   Seed Hash               : 93cc
### END SKETCH SUMMARY

and:

### HeapCompactOrderedSketch SUMMARY: 
   Estimate                : 48249.113729035394
   Upper Bound, 95% conf   : 50358.736970106176
   Lower Bound, 95% conf   : 46227.35737896924
   Theta (double)          : 0.04377282475820978
   Theta (long)            : 403733047849016500
   Theta (long) hex        : 059a591165205cb4
   EstMode?                : true
   Empty?                  : false
   Array Size Entries      : 2112
   Retained Entries        : 2112
   Seed Hash               : 93cc
### END SKETCH SUMMARY

These allow the end user to inspect whether a sketch is in estimation mode, or whether the sketch is ordered, or even the state of the sketch - whether undateable or compact.

Where is the best place to include these examples for the commit notes?

// https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html
public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 65536;
public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 16384;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this configurable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, unfortunately this is currently not configurable to my knowledge. This smaller value more closely resembles the default that we select when querying the theta sketch which is 4096.

// The serializer should respect existing ordering to enable "early stop"
// optimisations on unions.
boolean shouldCompact = !value.isCompact();
boolean shouldOrder = value.isOrdered();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we order if the value is not ordered?
boolean shouldOrder = value.isOrdered() -> !value.isOrdered()?

Current code just passes shouldOrder to compact(). In that case, we can just pass compact(value.isOrdered(), null)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think those points are valid - but I thought that it would be best to leave the decision as to whether to order in the hands of the end user. The current version respects the input ordering.

Moreover, the sketch is not re-compacted if it is already in compacted form.

This is a rather subjective take on the problem, and I will go with your recommendation on this one @snleee - what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was simply commenting that

boolean shouldOrder = value.isOrdered();
if (shouldCompact) {
  return value.compact(shouldOrder, null).toByteArray();
}

is the same as

if (shouldCompact) {
  return value.compact(value.isOrdered(), null).toByteArray();
}

Since shouldCompact was not being used in this function. If value.isOrdered() is provided by the end user to indicate that we should sort, we can keep it as is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the ordering is set by the end user.

}

// Performance optimisation: ensure that the minimum Theta is used for "early-stop".
_accumulator.sort(Comparator.comparingDouble(Sketch::getTheta));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also have the question as I don't see the code for early terminating the merge. I would assume that this is happening within Union or in some sketch side code the sorted property of sketches will trigger the early stop?

I think that it will be great if we can capture the above comment to the comment in the code for providing more insights on this optimization.

@davecromberge
Copy link
Member Author

@snleee thank you for your review - I'll wait for your opinion on this comment before making any changes, if any.

@swaminathanmanish
Copy link
Contributor

Thanks for the detailed description and the reasoning behind this approach. Intuitively this approach make sense. I took a first pass of the PR and have some high level questions

  1. There are 4 new params introduced. Have we quantified the gains for each of these params and which one yields the largest gains? Im assuming these params work independent of each other.
  2. We have nominal entries param for a sketch (which is the number of entries in a sketch?). Curious if we have already experimented tuning this param to figure out the gains ? How can this parameter impact performance.
  3. Could you share the benchmark results/numbers for different values of the params

Thank you for your review @swaminathanmanish. I found your questions very insightful. I should make it clear at the outset that it has been difficult to thoroughly benchmark the work in this pull request in a test environment. Some of the performance improvements are knobs to turn and are speculative in how they could behave.

Therefore I would like to propose that the parameters are retained and tested in a production environment, and then subsequently removed should they show no user benefit. The downside to this approach is that backward compatibility will not be maintained should end users start to depend on them and use them. As for your questions, answers follow inline.

  1. There are 4 new params introduced. Have we quantified the gains for each of these params and which one yields the largest gains? Im assuming these params work independent of each other.

The largest gain can be realised through adjusting the sampling probability parameter. However, this is highly dependent on the use case and should only be used in certain circumstances. All parameters are independent of each other and have been selected to have default behaviour that retains the existing behaviour of the system. The performance gains measured in testing are between 25% and 50% performance improvement. Where the results are sampled, the speed increases by 300%.

  1. We have nominal entries param for a sketch (which is the number of entries in a sketch?). Curious if we have already experimented tuning this param to figure out the gains ? How can this parameter impact performance.

We have been using this extensively, but it does not help where there is a large tail of sketches that have retained items less than nominal entries. For these cases, selecting lower nominal entries can impact the error across the board. Instead, using sampling probability allows the end user to curtail size and increase error on the long tail, which, for some reports and queries, is a good tradeoff.

  1. Could you share the benchmark results/numbers for different values of the params

I'd like to do more testing in a production environment. But, the default set of parameters can be up to 25% faster than the current implementation (through sorting and retaining unions). However, adjusting others such as sampling has shown orders of magnitude gains - but, there are certain tradeoffs on accuracy that need to be considered.

Thanks for the detailed response @davecromberge

@Jackie-Jiang Jackie-Jiang added enhancement documentation release-notes Referenced by PRs that need attention when compiling the next release notes performance labels Dec 5, 2023
@snleee snleee added the feature label Dec 5, 2023
@snleee snleee merged commit a9e3199 into apache:master Dec 5, 2023
19 checks passed
@jackjlli
Copy link
Member

Hey @davecromberge @snleee @swaminathanmanish , thanks for making the contribution to theta sketch functionality! We really appreciate that! While this PR introduces the backward compatibility that the data type is switched from Sketch to ThetaSketchAccumulator, which caused the existing tables running with distinctCountThetaSketch or distinctCountRawThetaSketch functionalities fail during the deployment time because of two different OSS versions being present in brokers and servers at the same time. This PR should have been marked as backward incompatible. And this is blocking our deployment in LinkedIn. Could you please adjust the code to make sure this enhancement won't break the existing running tables?

@davecromberge
Copy link
Member Author

Hi @jackjlli,
Apologies for blocking your deployment - I am looking at a fix to introduce backward compatibility in the protocol messages between server and broker.

@davecromberge
Copy link
Member Author

@snleee @swaminathanmanish I have created the following draft PR to address the problem reported by @jackjlli:
#12288

Please let me know what you think. I successfully reproduced the problem in a local cluster and corrected it with the PR fix. I have assumed that this only applies to the Combine phase (merge) function as per:
https://docs.pinot.apache.org/developers/developers-and-contributors/extending-pinot/custom-aggregation-function

@mcvsubbu mcvsubbu added incompatible Indicate PR that introduces backward incompatibility upgrade-incompat PR may introduce incompatibility during upgrade of an installation and removed incompatible Indicate PR that introduces backward incompatibility labels Jan 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation enhancement feature performance release-notes Referenced by PRs that need attention when compiling the next release notes upgrade-incompat PR may introduce incompatibility during upgrade of an installation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants