Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for multi-threaded Group By reducer for SQL. #6044

Merged
merged 1 commit into from
Oct 14, 2020

Conversation

mayankshriv
Copy link
Contributor

@mayankshriv mayankshriv commented Sep 22, 2020

The existing implementation of Broker reduce phase is single-threaded.
For group-by queries where large response are being sent back from multiple servers,
this could become a bottlenect.

Given that brokers are generally light on CPU usage, making the reduce phase
multi-threaded would be a good idea to boost performance. This PR adds a multi-threaded
implementation for the Group-By reducer for SQL.

  • Added an executor service in BrokerReduceService that can be used by multi-threaded
    execution of the broker reduce phase. This is initialized with number of threads as:
    Runtime.getRuntime().availableProcessors().

  • Added a broker side config to specify max number of threads per query to be used for reduce phase.
    pinot.broker.max.reduce.threads. This has a same default value as server side combine phase:
    Math.max(1, Math.min(10, Runtime.getRuntime().availableProcessors() / 2))

    For reverting to single threaded reduce, set this config to 1.

  • The GroupByDataTableReducer uses the following algorithm for determining number of
    threads to use in reduce phase (per query):

    • If there are less than 2 data tables to reduce, it uses single threaded run.
    • Else, it uses `Math.min(pinot.broker.max.reduce.threads, numDataTables).
  • For testing, explicitly sets num threads to reduce to be > 1 to ensure functional
    correctness is tested.

Description

Add a description of your PR here.
A good description should include pointers to an issue or design document, etc.

Upgrade Notes

Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)

  • Yes (Please label as backward-incompat, and complete the section below on Release Notes)

Does this PR fix a zero-downtime upgrade introduced earlier?

  • Yes (Please label this as backward-incompat, and complete the section below on Release Notes)

Does this PR otherwise need attention when creating release notes? Things to consider:

  • New configuration options
  • Deprecation of configurations
  • Signature changes to public methods/interfaces
  • New plugins added or old plugins removed
  • Yes (Please label this PR as release-notes and complete the section on Release Notes)

Release Notes

If you have tagged this as either backward-incompat or release-notes,
you MUST add text here that you would like to see appear in release notes of the
next release.

If you have a series of commits adding or enabling a feature, then
add this section only in final commit that marks the feature completed.
Refer to earlier release notes to see examples of text

Documentation

If you have introduced a new feature or configuration, please add it to the documentation as well.
See https://docs.pinot.apache.org/developers/developers-and-contributors/update-document

Copy link
Member

@kishoreg kishoreg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks

@@ -161,6 +161,10 @@
public static final double DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START = 100.0;
public static final String CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE = "pinot.broker.enable.query.limit.override";

// Config for number of threads to use for Broker reduce-phase.
public static final String CONFIG_OF_NUM_REDUCE_THREADS = "pinot.broker.num.reduce.threads";
public static final int DEFAULT_NUM_REDUCE_THREADS = 1; // TBD: Change to a more appropriate default (eg numCores).
Copy link
Contributor Author

@mayankshriv mayankshriv Sep 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, on second thought, the default value of 1 is not good, as it will make reduce across concurrent queries as sequential. Moreover, if we add more threads, then it may cause contention in case of high qps use cases.

While we tune this, perhaps the behavior should be:

  • If config not explicitly specified, then preserve current behavior without executor service, or perhaps using MoreExecutors.newDirectExecutorService() that uses the calling thread to execute the Runnable.
  • If config specified, use executor service with num threads specified in the config.

Thoughts @kishoreg @Jackie-Jiang ?

(I have updated the PR with the approach above).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This config is right but the implementation can be changed. This should be something similar to what we have in combine operator - Executor pool is cached or capped at a high number based on the number of cores. But the number of callables we create be based on this config.

public BrokerReduceService(PinotConfiguration config) {
_maxReduceThreadsPerQuery = config.getProperty(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY,
CommonConstants.Broker.MAX_REDUCE_THREADS_PER_QUERY);
LOGGER.info("Initializing BrokerReduceService with {} reduce threads.", _maxReduceThreadsPerQuery);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log both number or worker threads and threads per query?
Also, if it is single-threaded, no need to launch the executor service.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, I had Guava's MoreExecutor.directorExecutor() that uses the current thread to run the task, in case of single thread. I decided to just keep it simple and have the exact same code in case of single vs multi-thread (with exception of index table). We can revisit that if needed.

long timeOutMs = reducerContext.getReduceTimeOutMs() - (System.currentTimeMillis() - start);
countDownLatch.await(timeOutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
for (Future future : futures) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Critical) You need to put the timeout exception into the query response, or the response will be wrong and there is no way to detect that

@codecov-io
Copy link

codecov-io commented Oct 13, 2020

Codecov Report

Merging #6044 into master will increase coverage by 6.41%.
The diff coverage is 60.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #6044      +/-   ##
==========================================
+ Coverage   66.44%   72.86%   +6.41%     
==========================================
  Files        1075     1225     +150     
  Lines       54773    57848    +3075     
  Branches     8168     8528     +360     
==========================================
+ Hits        36396    42150    +5754     
+ Misses      15700    12948    -2752     
- Partials     2677     2750      +73     
Flag Coverage Δ
#integration 45.28% <49.14%> (?)
#unittests 64.03% <38.09%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...ot/broker/broker/AllowAllAccessControlFactory.java 100.00% <ø> (ø)
.../helix/BrokerUserDefinedMessageHandlerFactory.java 52.83% <0.00%> (-13.84%) ⬇️
...ava/org/apache/pinot/client/AbstractResultSet.java 53.33% <0.00%> (-3.81%) ⬇️
.../main/java/org/apache/pinot/client/Connection.java 44.44% <0.00%> (-4.40%) ⬇️
.../org/apache/pinot/client/ResultTableResultSet.java 24.00% <0.00%> (-10.29%) ⬇️
...not/common/assignment/InstancePartitionsUtils.java 78.57% <ø> (+5.40%) ⬆️
.../apache/pinot/common/exception/QueryException.java 90.27% <ø> (+5.55%) ⬆️
...pinot/common/function/AggregationFunctionType.java 98.27% <ø> (-1.73%) ⬇️
.../pinot/common/function/DateTimePatternHandler.java 83.33% <ø> (ø)
...ot/common/function/FunctionDefinitionRegistry.java 88.88% <ø> (+44.44%) ⬆️
... and 974 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 86ce7c6...bb955b5. Read the comment docs.

@mayankshriv mayankshriv force-pushed the mt-broker branch 3 times, most recently from bb955b5 to e20f784 Compare October 13, 2020 23:06
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still feel using fixed thread pool for single-threaded case can add overhead to the reducer, but if the perf shows no difference, I'm okay with it.

The existing implementation of Broker reduce phase is single-threaded.
For group-by queries where large response are being sent back from multiple servers,
this could become a bottlenect.

Given that brokers are generally light on CPU usage, making the reduce phase
multi-threaded would be a good idea to boost performance. This PR adds a multi-threaded
implementation for the Group-By reducer for SQL.

- Added an executor service in BrokerReduceService that can be used by multi-threaded
  execution of the broker reduce phase. This is initialized with number of threads as:
  `Runtime.getRuntime().availableProcessors()`.

- Added a broker side config to specify max number of threads per query to be used for reduce phase.
  `pinot.broker.max.reduce.threads.per.query`. This has a same default value as server side combine phase:
  `Math.max(1, Math.min(10, Runtime.getRuntime().availableProcessors() / 2))`

   For reverting to single threaded reduce, set this config to 1.

- The GroupByDataTableReducer uses the following algorithm for determining number of
  threads to use in reduce phase (per query):
  - If there are less than 2 data tables to reduce, it uses single threaded run.
  - Else, it uses `Math.min(pinot.broker.max.reduce.threads.per.query, numDataTables).

- For testing, explicitly sets num threads to reduce to be > 1 to ensure functional
  correctness is tested.
@mayankshriv
Copy link
Contributor Author

I still feel using fixed thread pool for single-threaded case can add overhead to the reducer, but if the perf shows no difference, I'm okay with it.

I agree that it will add overhead. The question is how much, and is it worth complicating the code with multiple implementations (one for single and one for multi threads). Let's use this as baseline and improve if we see a degradation.

@mayankshriv mayankshriv merged commit a910f5d into apache:master Oct 14, 2020
@mayankshriv mayankshriv deleted the mt-broker branch October 14, 2020 03:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants