refactor: introduce pinot.broker.multistage.sort.exchange.copy.threshold for sort-exchange copy transformation#17328
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #17328 +/- ##
============================================
+ Coverage 63.23% 63.27% +0.04%
Complexity 1474 1474
============================================
Files 3152 3153 +1
Lines 187870 187914 +44
Branches 28762 28762
============================================
+ Hits 118793 118896 +103
+ Misses 59861 59805 -56
+ Partials 9216 9213 -3
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
yashmayya
left a comment
There was a problem hiding this comment.
I guess you meant to link to #12237 in the PR description?
The first is justified on the fact that we don't use k-merge sort, and therefore, the sending side sort is useless.
I don't understand the reason why the <10k rows condition was added.
I don't know the history here either, but maybe the reasoning was that beyond some arbitrary threshold (determined heuristically probably), the benefit of reducing the number of rows being sent and globally sorted is outweighed by the cost of duplicated sorting?
You are right. Updated |
60983e4 to
3b9062b
Compare
|
@yashmayya I modified this PR. Can you review it again? |
3b9062b to
929a4ac
Compare
| * NOTE: this file was generated using Calcite's code generator, but instead of pulling in all | ||
| * the dependencies for codegen we just manually generate it and check it in. If active development | ||
| * on this needs to happen, re-generate it using Calcite's generator. |
There was a problem hiding this comment.
This rule is not need it anymore. I don't know why it was decided to not use the generator, but we are actually using in other rules, so doesn't make much sense to have this generated class here
929a4ac to
69b04f2
Compare
| private ImmutableQueryEnvironment.Config getQueryEnvConf(HttpHeaders httpHeaders, Map<String, String> queryOptions, | ||
| long requestId) { | ||
| String database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptions, httpHeaders); | ||
| boolean inferPartitionHint = _config.getProperty(CommonConstants.Broker.CONFIG_OF_INFER_PARTITION_HINT, |
There was a problem hiding this comment.
This is moved to pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironmentConfigFactory.java
69b04f2 to
cecedaf
Compare
yashmayya
left a comment
There was a problem hiding this comment.
Looks like there's a compilation issue in here.
- A new query option called
pinot.broker.multistage.sort.exchange.copy.threshold, which can be used to change the default threshold for all queries in the broker.- The query option
sortExchangeCopyThreshold, which can be used to change this threshold for specific queries.
What is our official recommendation going to be for tuning this threshold?
I think until we have k-merge implemented, we should disable this optimization. Whether we want to disable it by default or not is something we should discuss |
|
The compilation error should be fixed now |
pinot-query-planner/src/test/resources/queries/OrderByPlans.json
Outdated
Show resolved
Hide resolved
|
|
||
| /// A factory class to create QueryEnvironment.Config instances. | ||
| /// | ||
| /// This includes the ability to cus |
| public static ImmutableQueryEnvironment.Config create( | ||
| String database, Map<String, String> queryOptions, | ||
| long requestId, PinotConfiguration config, TableCache tableCache, WorkerManager workerManager, | ||
| @Nullable Set<String> defaultDisabledPlannerRules) { |
There was a problem hiding this comment.
Should this be nullable?
pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotQueryRuleSets.java
Show resolved
Hide resolved
...uery-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotSortExchangeCopyRule.java
Show resolved
Hide resolved
…se when limit is higher than 10k
…st to the correct query environment usage
f3b87f6 to
cb951ed
Compare
| return uncheckedParseInt(QueryOptionKey.REGEX_DICT_SIZE_THRESHOLD, regexDictSizeThreshold); | ||
| } | ||
|
|
||
| public static int getSortExchangeCopyThreshold(Map<String, String> options, int i) { |
There was a problem hiding this comment.
nit: i -> defaultSortExchangeCopyThreshold?
This PR adds a query option and a broker config to customize the threshold used in the optimization introduced in #12237.
This optimization affects queries like:
MSE blindly adds an exchange here (I think sometimes it is not needed, but that is not the point of this PR). We need to add a sort-with-limit in the receiver stage to preserve semantics, and we can also add one in the sender stage to avoid sending extra blocks and, eventually, enable k-merge sort.
The PR #12237 modifies the
PinotSortExchangeCopyRulerule only to add the sort-with-limit when:The first is justified by the fact that we don't use k-merge sort, and therefore, the sending side sort is useless.
I don't understand the reason why the <10k rows condition was added. We have found cases where huge limits are used by not adding this filter, we end up sending too many rows that we don't actually need.
Initially, this PR removed the second condition, but in order to be backward compatible, I changed the code to include:
pinot.broker.multistage.sort.exchange.copy.threshold, which can be used to change the default threshold for all queries in the broker.sortExchangeCopyThreshold, which can be used to change this threshold for specific queries.The change includes two tests that verify the query option can be used to increase the threshold and also to reduce it. When I created these tests I found that we are not using the default QueryEnvironment in the tests that derive from
QueryEnvironmentTestBase.java. This is because they called the default constructor ofQueryEnvironmentinstead of using the one created inMultiStageBrokerRequestHandler. In order to fix that, this PR creates aQueryEnvironmentFactorythat contains the code ofMultiStageBrokerRequestHandler.getQueryEnvConf.Notice that this PR only modifies the tests that derive from
ResourceBasedQueryPlansTest.testQueryExplainPlansAndQueryPlanConversion. There are other tests that should be upgraded as well, but they are not related to this PR