Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Concurrent Segment Search] shard_min_doc_count and shard_size should not be evaluated at the slice level #8860

Closed
jed326 opened this issue Jul 25, 2023 · 4 comments · Fixed by #9085

Comments

@jed326
Copy link
Collaborator

jed326 commented Jul 25, 2023

Subtask for #7357 to focus on the test failures related to shard size parameter:

REPRODUCE WITH: ./gradlew 'null' --tests "org.opensearch.search.aggregations.bucket.TermsShardMinDocCountIT.testShardMinDocCountSignificantTermsTest" -Dtests.seed=A749B9CB2092798B -Dtests.locale=en-IE -Dtests.timezone=Europe/Madrid -Druntime.java=17

java.lang.AssertionError: 
Expected: <0>
     but: was <1>
Expected :<0>
Actual   :<1>
<Click to see difference>


	at __randomizedtesting.SeedInfo.seed([A749B9CB2092798B:D251D3457B327719]:0)
	at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18)
	at org.junit.Assert.assertThat(Assert.java:964)
	at org.junit.Assert.assertThat(Assert.java:930)
	at org.opensearch.search.aggregations.bucket.TermsShardMinDocCountIT.testShardMinDocCountSignificantTermsTest(TermsShardMinDocCountIT.java:103)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)

REPRODUCE WITH: ./gradlew 'null' --tests "org.opensearch.search.aggregations.bucket.TermsShardMinDocCountIT.testShardMinDocCountTermsTest" -Dtests.seed=A749B9CB2092798B -Dtests.locale=en-IE -Dtests.timezone=Europe/Madrid -Druntime.java=17

java.lang.AssertionError: 
Expected: <0>
     but: was <2>
Expected :<0>
Actual   :<2>
<Click to see difference>


	at __randomizedtesting.SeedInfo.seed([A749B9CB2092798B:BD5353BCAD8763C8]:0)
	at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18)
	at org.junit.Assert.assertThat(Assert.java:964)
	at org.junit.Assert.assertThat(Assert.java:930)
	at org.opensearch.search.aggregations.bucket.TermsShardMinDocCountIT.testShardMinDocCountTermsTest(TermsShardMinDocCountIT.java:169)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	a
 - org.opensearch.search.aggregations.bucket.ShardSizeTermsIT.testShardSizeEqualsSizeDouble
 - org.opensearch.search.aggregations.bucket.ShardSizeTermsIT.testShardSizeEqualsSizeString
@jed326 jed326 added :test Adding or fixing a test distributed framework labels Jul 25, 2023
@jed326 jed326 self-assigned this Jul 25, 2023
@jed326 jed326 changed the title [Concurrent Segment Search] Fix TermsShardMinDocCountIT related tests [Concurrent Segment Search] Fix shard size parameter related tests Jul 25, 2023
@jed326 jed326 changed the title [Concurrent Segment Search] Fix shard size parameter related tests [Concurrent Segment Search] shard_min_doc_count and shard_size should not be evaluated at the slice level Aug 1, 2023
@jed326
Copy link
Collaborator Author

jed326 commented Aug 1, 2023

shard_size and shard_min_doc_count are being evaluated at the slice level which is leading to unexpected results in the tests causing failures.

High Level Overview:

  • buildAggregations creates a priority queue with size min(buckets, shard_size)
    int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize());
    PriorityQueue<B> ordered = buildPriorityQueue(size);
    • This is problem 1 -- shard_size is being evaluated at the slice level. This manifests in 2 ways. 1) the shards can return up to shard_size * slice_count buckets to the coordinator, so shard_size is not actually being enforced and 2) the doc_count of collected buckets may be inaccurate because the same term may exist on multiple slices but due to the shard_size limitation the same term bucket is not collected on all slices.
  • Iterates through buckets -- If docCount >= shard_min_doc_count for the bucket, then add the bucket to the priority queue
    • This is problem 2 -- for concurrent search we are checking the shard_min_doc_count at the slice level. Across slices docCount may add up to be >= shard_min_doc_count, however if that's not true for a given slice then a bucket is not created for the term.
  • Priority queue will drop items from it as new ones are added, ensuring we only end up with min(buckets, shard_size) buckets
  • At the coordinator level, we do not consume any buckets with less than min_doc_count docs:

High Level Solutions:

I have a few high level solutions in mind. The big problem is with shard_size. We should have some sort of limit on the number of buckets a slice can collect, but if we ignore shard_size at the slice level then it could grow unbounded. However, introducing any sort of slice level limit on bucket count (let's call this slice_size for now), can result in behavioral changes between concurrent and non-concurrent search use cases if the number of ordinals >> the slice_size. Specifically the doc_count could be inaccurate and missing some docs from some slices. This is analogous to how shard_size behaves in the 1 shard vs multiple shard case. This is true even if we introduce slice level settings, which I don't believe we should do because the user doesn't really have any control or knowledge over how documents are distributed over slices so these controls don't really mean anything to them.

Essentially there are a few different ways we can address both shard_min_doc_count and shard_size and we can combine the solutions in different ways.

shard_min_doc_count:

  1. No changes
  2. Ignore at the slice level
  3. Ignore at the slice level and apply at shard reduce
  4. Introduce new slice level setting + [3]

shard_size:

  1. No changes
  2. Ignore at the slice level
  3. Instead of applying shard_size at the slice level, we use some default higher than shard_size. For example, shard_size defaults to 1.2*size + 10.
  4. Ignore at the slice level, apply at the shard level reduce. This requires using the same comparitor for ordering purposes.
  5. Introduce new slice level setting + [4]

Currently my recommendation is [3] for shard_min_doc_count and [3] for shard_size. I'm split on if we should reduce the number of buckets down to shard_size at the shard level reduce. The intention of shard_size is to give users a way to get more accurate results, so with solution [3] for shard_size I don't see a purpose in doing the reduce here.

@jed326
Copy link
Collaborator Author

jed326 commented Aug 1, 2023

@sohami @reta @andrross Would like to get your input on this. It seems difficult to completely preserve the behavior between concurrent and non-concurrent search cases here.

@sohami
Copy link
Collaborator

sohami commented Aug 1, 2023

@jed326 Thanks for the analysis.

The big problem is with shard_size. We should have some sort of limit on the number of buckets a slice can collect, but if we ignore shard_size at the slice level then it could grow unbounded

I didn't understand what you mean my it could grow unbounded. For sequential case as well, the shard_size limit is applied after document collection is done on the shards and before returning the output (i.e. in buildAggregation). So if we ignore the shard_size parameter at slice level but apply it before sending it to the coordinator then still the same bounds can be applied.

I think for both the shard level parameter if we ignore these at slice level and then apply it as part of reduce, it should work as expected. What are the challenges for shard_size option 4 ?

@jed326
Copy link
Collaborator Author

jed326 commented Aug 1, 2023

I didn't understand what you mean my it could grow unbounded. For sequential case as well, the shard_size limit is applied after document collection is done on the shards and before returning the output (i.e. in buildAggregation).

You are right that at the shard level the same bounds would still be applied, so from the coordinator perspective concurrent and non-concurrent search would look the same. However, the unbounded growth I am concerned about here is basically the size of the priority queue at the slice level, which is also the number of buckets being returned at the slice level. Today this is bounded by the shard_size parameter, but if we ignore that then each slice would return a bucket for every single ordinal, which looks like a serious scaling issue to me. I think this is analogous to why shard_size parameter exists at all instead of just returning every single bucket to the coordinator and doing the size bound there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

4 participants