-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Theta Sketch Aggregation Enhancements #12042
Conversation
Introduces additional parameters to the DistinctCountThetaSketch aggregation function that give the end-user more control over how sketches are merged. The defaults are selected to ensure that the behaviour remains unchanged over the current implementation. Furthermore, an accumulator custom object is added to ensure that pairwise union operations are avoided as much as possible. Instead, sketches can be aggregated and merged when a threshold is met.
Notes for reviewers:The changes in this PR introduce a different intermediate result type for the DistinctCountThetaSketch aggregation function. The reason for doing so is to eliminate the intermediate bookkeeping necessary to perform unions on two sketches at a time. When using JMH benchmarks to simulate this scenario, the speedup achieved by accumulating sketches prior to union is often an improvement by a factor of 3. Reference: Alternatives considered
|
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #12042 +/- ##
============================================
+ Coverage 61.62% 61.66% +0.03%
Complexity 1152 1152
============================================
Files 2389 2390 +1
Lines 129824 129954 +130
Branches 20083 20100 +17
============================================
+ Hits 80009 80136 +127
+ Misses 43989 43979 -10
- Partials 5826 5839 +13
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
This better aligns to the default nominal values parameter that is used in the query aggregation function.
I'm converting this to a draft in order to gather feedback on the approach. I have explored point 2 in the alternatives section above which might lead to a simpler implementation - see commit 2ed38f3. Whilst being somewhat simpler, performance can degrade during the "merge" operation where two UnionWraps behave in pairwise fashion. For this reason, I've reverted back to my original approach. |
Removes intermediate array list to buffer/accumulate sketch elements. Instead, inputs are fed directly to the underlying union. This ensures that the memory usage of the merge is kept under control.
…theta-sketch-accumulator
This reverts commit 2ed38f3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the detailed description and the reasoning behind this approach. Intuitively this approach make sense. I took a first pass of the PR and have some high level questions
- There are 4 new params introduced. Have we quantified the gains for each of these params and which one yields the largest gains? Im assuming these params work independent of each other.
- We have nominal entries param for a sketch (which is the number of entries in a sketch?). Curious if we have already experimented tuning this param to figure out the gains ? How can this parameter impact performance.
- Could you share the benchmark results/numbers for different values of the params
} | ||
|
||
// Performance optimisation: ensure that the minimum Theta is used for "early-stop". | ||
_accumulator.sort(Comparator.comparingDouble(Sketch::getTheta)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimization here is to stop the sort based on theta threshold ? Just trying to understand how early stop takes effect here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Internally, an compact and ordered Theta sketch can be compared to a sorted array of K items, where K is often in the order of thousands.
To merge several sketches, the union operator will retain the items from each array which then need to be de-duplicated and stored where the item is less than the threshold, Theta. If the Theta is chosen to be the minimum of those arrays, it means that it only has to process up to Min Theta for each sketch in the merge, thus reducing de-duplication costs and potentially thousands of hash collision checks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also have the question as I don't see the code for early terminating the merge. I would assume that this is happening within Union
or in some sketch side code the sorted property of sketches will trigger the early stop?
I think that it will be great if we can capture the above comment to the comment in the code for providing more insights on this optimization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done - see 90d4562.
// optimisations on unions. | ||
boolean shouldCompact = !value.isCompact(); | ||
boolean shouldOrder = value.isOrdered(); | ||
if (shouldCompact) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume shouldCompact defaults to true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No - this is not the case and it depends on how the ingested sketches have been processed by the end user. It might be the case that this is true for the empty and singleton sketch (sketch of a single hashed item), but not for others.
The emphasis on these changes is that the serialiser does not alter the current ordering on existing sketches. There is a small performance optimisation to this in that there is no need to re-compact or re-order sketches that are already in that state.
_accumulator.add(sketch); | ||
_numInputs += 1; | ||
|
||
if (_accumulator.size() >= _threshold) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main optimization here is to batch the sketch unions instead of doing many unions, to amortize the sort-merge cost, right? And thats where we tune this threshold parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, correct. The early stop mechanism and internal book-keeping structure (a QuickSelect updateable sketch on heap) need only be created for a single merge. The penalty that the end user pays for this amonisation is that the merge can consume more memory.
_accumulator.add(sketch); | ||
_numInputs += 1; | ||
|
||
if (_accumulator.size() >= _threshold) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a final catch-all to do the union (last batch), even if the threshold is not met? I guess it happens via
public Sketch getResult() {
return unionAll();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right - the result is extracted when the server returns a result to the broker or when the broker performs the final post aggregation operations.
accumulator.setOrdered(_intermediateOrdering); | ||
accumulator.setThreshold(_accumulatorThreshold); | ||
accumulator.setSetOperationBuilder(_setOperationBuilder); | ||
mergedSketches.add(accumulator.getResult()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would happen in the default case ? I assume it would just do union a for every sketch pair, instead of the intermediate batching (default threshold = 2).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is the default behaviour - I believe that this matches the existing implementation.
@@ -261,6 +261,11 @@ public static Sketch thetaSketchDiff(Object sketchObjectA, Object sketchObjectB) | |||
return diff.getResult(false, null, false); | |||
} | |||
|
|||
@ScalarFunction(names = {"thetaSketchToString", "theta_sketch_to_string"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these new functions added to take in the new parameters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, these are useful for debugging purposes. They can be used to inspect the preamble flags for sorting and compaction for sketches in storage, or, for sketches that are returned by the aggregation function.
int nominalEntries = parameters.getNominalEntries(); | ||
_updateSketchBuilder.setNominalEntries(nominalEntries); | ||
_setOperationBuilder.setNominalEntries(nominalEntries); | ||
// Sampling probability sets the initial value of Theta, defaults to 1.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this sampling to skip entries in a sketch, while doing the sort/merge?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sampling probability is useful to end users who wish to build sketches on the fly from raw values or from existing sketches. These users might wish to trim down the size of degenerate sketches (sketches that are below capacity). There are certain use cases (such as ordering by top ranked items) where the user might not care if the tail of the list is negatively impacted in terms of error.
_setOperationBuilder.setP(p); | ||
_updateSketchBuilder.setP(p); | ||
// Resize factor controls the size multiple that affects how fast the internal cache grows | ||
ResizeFactor rf = parameters.getResizeFactor(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious how this is tuned and impacts performance ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The resize factor trades off memory consumption for performance. Sometimes, the internal hash table of the sketch reaches capacity and must be resized which is an expensive operation. All existing keys need to rehashed.
Using a different resize factor can control how large the initial hash table is on construction, thereby trading off the cost of a resize operation with potential over-allocation of memory on heap.
Thank you for your review @swaminathanmanish. I found your questions very insightful. I should make it clear at the outset that it has been difficult to thoroughly benchmark the work in this pull request in a test environment. Some of the performance improvements are knobs to turn and are speculative in how they could behave. Therefore I would like to propose that the parameters are retained and tested in a production environment, and then subsequently removed should they show no user benefit. The downside to this approach is that backward compatibility will not be maintained should end users start to depend on them and use them. As for your questions, answers follow inline.
The largest gain can be realised through adjusting the sampling probability parameter. However, this is highly dependent on the use case and should only be used in certain circumstances. All parameters are independent of each other and have been selected to have default behaviour that retains the existing behaviour of the system. The performance gains measured in testing are between 25% and 50% performance improvement. Where the results are sampled, the speed increases by 300%.
We have been using this extensively, but it does not help where there is a large tail of sketches that have retained items less than nominal entries. For these cases, selecting lower nominal entries can impact the error across the board. Instead, using sampling probability allows the end user to curtail size and increase error on the long tail, which, for some reports and queries, is a good tradeoff.
I'd like to do more testing in a production environment. But, the default set of parameters can be up to 25% faster than the current implementation (through sorting and retaining unions). However, adjusting others such as sampling has shown orders of magnitude gains - but, there are certain tradeoffs on accuracy that need to be considered. |
…theta-sketch-accumulator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM otherwise
@@ -261,6 +261,11 @@ public static Sketch thetaSketchDiff(Object sketchObjectA, Object sketchObjectB) | |||
return diff.getResult(false, null, false); | |||
} | |||
|
|||
@ScalarFunction(names = {"thetaSketchToString", "theta_sketch_to_string"}) | |||
public static String thetaSketchToString(Object sketchObject) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do we return for this function? Can we add some examples for commit notes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function returns a string which is a summary of the current sketch state. Note, the function name is consistent with the same name as that in Druid. See this link.
Examples include:
### HeapCompactOrderedSketch SUMMARY:
Estimate : 149586.73149344584
Upper Bound, 95% conf : 154287.5017892762
Lower Bound, 95% conf : 145028.6046846571
Theta (double) : 0.027382107751846067
Theta (long) : 252555366948521403
Theta (long) hex : 038141c4a515c5bb
EstMode? : true
Empty? : false
Array Size Entries : 4096
Retained Entries : 4096
Seed Hash : 93cc
### END SKETCH SUMMARY
and:
### HeapCompactOrderedSketch SUMMARY:
Estimate : 48249.113729035394
Upper Bound, 95% conf : 50358.736970106176
Lower Bound, 95% conf : 46227.35737896924
Theta (double) : 0.04377282475820978
Theta (long) : 403733047849016500
Theta (long) hex : 059a591165205cb4
EstMode? : true
Empty? : false
Array Size Entries : 2112
Retained Entries : 2112
Seed Hash : 93cc
### END SKETCH SUMMARY
These allow the end user to inspect whether a sketch is in estimation mode, or whether the sketch is ordered, or even the state of the sketch - whether undateable or compact.
Where is the best place to include these examples for the commit notes?
// https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html | ||
public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 65536; | ||
public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 16384; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, unfortunately this is currently not configurable to my knowledge. This smaller value more closely resembles the default that we select when querying the theta sketch which is 4096.
// The serializer should respect existing ordering to enable "early stop" | ||
// optimisations on unions. | ||
boolean shouldCompact = !value.isCompact(); | ||
boolean shouldOrder = value.isOrdered(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we order if the value is not ordered?
boolean shouldOrder = value.isOrdered() -> !value.isOrdered()
?
Current code just passes shouldOrder
to compact(). In that case, we can just pass compact(value.isOrdered(), null)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think those points are valid - but I thought that it would be best to leave the decision as to whether to order in the hands of the end user. The current version respects the input ordering.
Moreover, the sketch is not re-compacted if it is already in compacted form.
This is a rather subjective take on the problem, and I will go with your recommendation on this one @snleee - what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was simply commenting that
boolean shouldOrder = value.isOrdered();
if (shouldCompact) {
return value.compact(shouldOrder, null).toByteArray();
}
is the same as
if (shouldCompact) {
return value.compact(value.isOrdered(), null).toByteArray();
}
Since shouldCompact
was not being used in this function. If value.isOrdered()
is provided by the end user to indicate that we should sort, we can keep it as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the ordering is set by the end user.
} | ||
|
||
// Performance optimisation: ensure that the minimum Theta is used for "early-stop". | ||
_accumulator.sort(Comparator.comparingDouble(Sketch::getTheta)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also have the question as I don't see the code for early terminating the merge. I would assume that this is happening within Union
or in some sketch side code the sorted property of sketches will trigger the early stop?
I think that it will be great if we can capture the above comment to the comment in the code for providing more insights on this optimization.
@snleee thank you for your review - I'll wait for your opinion on this comment before making any changes, if any. |
Thanks for the detailed response @davecromberge |
Hey @davecromberge @snleee @swaminathanmanish , thanks for making the contribution to theta sketch functionality! We really appreciate that! While this PR introduces the backward compatibility that the data type is switched from |
Hi @jackjlli, |
@snleee @swaminathanmanish I have created the following draft PR to address the problem reported by @jackjlli: Please let me know what you think. I successfully reproduced the problem in a local cluster and corrected it with the PR fix. I have assumed that this only applies to the Combine phase (merge) function as per: |
Introduces additional parameters to the DistinctCountThetaSketch aggregation function that give the end-user more control over how sketches are aggregated at query time. The defaults are selected to ensure that the behaviour remains unchanged over the current implementation.
Furthermore, an accumulator custom object is added to ensure that pairwise union operations are avoided as much as possible. Instead, sketches can be aggregated and merged when a threshold is met.
This PR is a
performance
enhancement and can be tagged/labelled as such.release-notes
: