-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support for multi-threaded Group By reducer for SQL. #6044
Conversation
0192716
to
8b72494
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks
@@ -161,6 +161,10 @@ | |||
public static final double DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START = 100.0; | |||
public static final String CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE = "pinot.broker.enable.query.limit.override"; | |||
|
|||
// Config for number of threads to use for Broker reduce-phase. | |||
public static final String CONFIG_OF_NUM_REDUCE_THREADS = "pinot.broker.num.reduce.threads"; | |||
public static final int DEFAULT_NUM_REDUCE_THREADS = 1; // TBD: Change to a more appropriate default (eg numCores). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, on second thought, the default value of 1
is not good, as it will make reduce across concurrent queries as sequential. Moreover, if we add more threads, then it may cause contention in case of high qps use cases.
While we tune this, perhaps the behavior should be:
- If config not explicitly specified, then preserve current behavior without executor service, or perhaps using
MoreExecutors.newDirectExecutorService()
that uses the calling thread to execute the Runnable. - If config specified, use executor service with num threads specified in the config.
Thoughts @kishoreg @Jackie-Jiang ?
(I have updated the PR with the approach above).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This config is right but the implementation can be changed. This should be something similar to what we have in combine operator - Executor pool is cached or capped at a high number based on the number of cores. But the number of callables we create be based on this config.
105fe42
to
5174dce
Compare
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
Outdated
Show resolved
Hide resolved
public BrokerReduceService(PinotConfiguration config) { | ||
_maxReduceThreadsPerQuery = config.getProperty(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY, | ||
CommonConstants.Broker.MAX_REDUCE_THREADS_PER_QUERY); | ||
LOGGER.info("Initializing BrokerReduceService with {} reduce threads.", _maxReduceThreadsPerQuery); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log both number or worker threads and threads per query?
Also, if it is single-threaded, no need to launch the executor service.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially, I had Guava's MoreExecutor.directorExecutor()
that uses the current thread to run the task, in case of single thread. I decided to just keep it simple and have the exact same code in case of single vs multi-thread (with exception of index table). We can revisit that if needed.
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
Outdated
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
Show resolved
Hide resolved
long timeOutMs = reducerContext.getReduceTimeOutMs() - (System.currentTimeMillis() - start); | ||
countDownLatch.await(timeOutMs, TimeUnit.MILLISECONDS); | ||
} catch (InterruptedException e) { | ||
for (Future future : futures) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Critical) You need to put the timeout exception into the query response, or the response will be wrong and there is no way to detect that
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
Show resolved
Hide resolved
pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
Outdated
Show resolved
Hide resolved
c736046
to
1ad015b
Compare
Codecov Report
@@ Coverage Diff @@
## master #6044 +/- ##
==========================================
+ Coverage 66.44% 72.86% +6.41%
==========================================
Files 1075 1225 +150
Lines 54773 57848 +3075
Branches 8168 8528 +360
==========================================
+ Hits 36396 42150 +5754
+ Misses 15700 12948 -2752
- Partials 2677 2750 +73
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
bb955b5
to
e20f784
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still feel using fixed thread pool for single-threaded case can add overhead to the reducer, but if the perf shows no difference, I'm okay with it.
pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
Show resolved
Hide resolved
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
Show resolved
Hide resolved
The existing implementation of Broker reduce phase is single-threaded. For group-by queries where large response are being sent back from multiple servers, this could become a bottlenect. Given that brokers are generally light on CPU usage, making the reduce phase multi-threaded would be a good idea to boost performance. This PR adds a multi-threaded implementation for the Group-By reducer for SQL. - Added an executor service in BrokerReduceService that can be used by multi-threaded execution of the broker reduce phase. This is initialized with number of threads as: `Runtime.getRuntime().availableProcessors()`. - Added a broker side config to specify max number of threads per query to be used for reduce phase. `pinot.broker.max.reduce.threads.per.query`. This has a same default value as server side combine phase: `Math.max(1, Math.min(10, Runtime.getRuntime().availableProcessors() / 2))` For reverting to single threaded reduce, set this config to 1. - The GroupByDataTableReducer uses the following algorithm for determining number of threads to use in reduce phase (per query): - If there are less than 2 data tables to reduce, it uses single threaded run. - Else, it uses `Math.min(pinot.broker.max.reduce.threads.per.query, numDataTables). - For testing, explicitly sets num threads to reduce to be > 1 to ensure functional correctness is tested.
e20f784
to
283c8e3
Compare
I agree that it will add overhead. The question is how much, and is it worth complicating the code with multiple implementations (one for single and one for multi threads). Let's use this as baseline and improve if we see a degradation. |
The existing implementation of Broker reduce phase is single-threaded.
For group-by queries where large response are being sent back from multiple servers,
this could become a bottlenect.
Given that brokers are generally light on CPU usage, making the reduce phase
multi-threaded would be a good idea to boost performance. This PR adds a multi-threaded
implementation for the Group-By reducer for SQL.
Added an executor service in BrokerReduceService that can be used by multi-threaded
execution of the broker reduce phase. This is initialized with number of threads as:
Runtime.getRuntime().availableProcessors()
.Added a broker side config to specify max number of threads per query to be used for reduce phase.
pinot.broker.max.reduce.threads
. This has a same default value as server side combine phase:Math.max(1, Math.min(10, Runtime.getRuntime().availableProcessors() / 2))
For reverting to single threaded reduce, set this config to 1.
The GroupByDataTableReducer uses the following algorithm for determining number of
threads to use in reduce phase (per query):
For testing, explicitly sets num threads to reduce to be > 1 to ensure functional
correctness is tested.
Description
Add a description of your PR here.
A good description should include pointers to an issue or design document, etc.
Upgrade Notes
Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)
backward-incompat
, and complete the section below on Release Notes)Does this PR fix a zero-downtime upgrade introduced earlier?
backward-incompat
, and complete the section below on Release Notes)Does this PR otherwise need attention when creating release notes? Things to consider:
release-notes
and complete the section on Release Notes)Release Notes
If you have tagged this as either backward-incompat or release-notes,
you MUST add text here that you would like to see appear in release notes of the
next release.
If you have a series of commits adding or enabling a feature, then
add this section only in final commit that marks the feature completed.
Refer to earlier release notes to see examples of text
Documentation
If you have introduced a new feature or configuration, please add it to the documentation as well.
See https://docs.pinot.apache.org/developers/developers-and-contributors/update-document