Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for multi-threaded Group By reducer for SQL. #6044

Merged
merged 1 commit into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {

protected final AtomicLong _requestIdGenerator = new AtomicLong();
protected final BrokerRequestOptimizer _brokerRequestOptimizer = new BrokerRequestOptimizer();
protected final BrokerReduceService _brokerReduceService = new BrokerReduceService();
protected final BrokerReduceService _brokerReduceService;

protected final String _brokerId;
protected final long _brokerTimeoutMs;
Expand Down Expand Up @@ -145,6 +145,7 @@ public BaseBrokerRequestHandler(PinotConfiguration config, RoutingManager routin
_numDroppedLog = new AtomicInteger(0);
_numDroppedLogRateLimiter = RateLimiter.create(1.0);

_brokerReduceService = new BrokerReduceService(_config);
LOGGER
.info("Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps",
_brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, _queryLogRateLimiter.getRate());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.broker.api.RequestStatistics;
Expand Down Expand Up @@ -69,6 +70,7 @@ public void start() {
@Override
public synchronized void shutDown() {
_queryRouter.shutDown();
_brokerReduceService.shutDown();
}

@Override
Expand Down Expand Up @@ -104,8 +106,9 @@ protected BrokerResponse processBrokerRequest(long requestId, BrokerRequest orig
int numServersResponded = dataTableMap.size();

long reduceStartTimeNs = System.nanoTime();
long reduceTimeOutMs = timeoutMs - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - scatterGatherStartTimeNs);
BrokerResponseNative brokerResponse =
_brokerReduceService.reduceOnDataTable(originalBrokerRequest, dataTableMap, _brokerMetrics);
_brokerReduceService.reduceOnDataTable(originalBrokerRequest, dataTableMap, reduceTimeOutMs, _brokerMetrics);
final long reduceTimeNanos = System.nanoTime() - reduceStartTimeNs;
requestStatistics.setReduceTimeNanos(reduceTimeNanos);
_brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REDUCE, reduceTimeNanos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ public static class Broker {
public static final double DEFAULT_BROKER_MIN_RESOURCE_PERCENT_FOR_START = 100.0;
public static final String CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE = "pinot.broker.enable.query.limit.override";

// Config for number of threads to use for Broker reduce-phase.
public static final String CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY = "pinot.broker.max.reduce.threads.per.query";
public static final int DEFAULT_MAX_REDUCE_THREADS_PER_QUERY =
Math.max(1, Math.min(10, Runtime.getRuntime().availableProcessors() / 2)); // Same logic as CombineOperatorUtils

public static class Request {
public static final String PQL = "pql";
public static final String SQL = "sql";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class AggregationDataTableReducer implements DataTableReducer {
@Override
public void reduceAndSetResults(String tableName, DataSchema dataSchema,
Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative brokerResponseNative,
BrokerMetrics brokerMetrics) {
DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
if (dataTableMap.isEmpty()) {
if (_responseFormatSql) {
DataSchema resultTableSchema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
*/
package org.apache.pinot.core.query.reduce;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
Expand All @@ -31,13 +35,17 @@
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.query.request.context.ExpressionContext;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -47,8 +55,36 @@
@ThreadSafe
public class BrokerReduceService {

private static final Logger LOGGER = LoggerFactory.getLogger(BrokerReduceService.class);

// brw -> Shorthand for broker reduce worker threads.
private static final String REDUCE_THREAD_NAME_FORMAT = "brw-%d";

// Set the reducer priority higher than NORM but lower than MAX, because if a query is complete
// we want to deserialize and return response as soon. This is the same as server side 'pqr' threads.
protected static final int QUERY_RUNNER_THREAD_PRIORITY = 7;

private final ExecutorService _reduceExecutorService;
private final int _maxReduceThreadsPerQuery;

public BrokerReduceService(PinotConfiguration config) {
_maxReduceThreadsPerQuery = config.getProperty(CommonConstants.Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY,
CommonConstants.Broker.DEFAULT_MAX_REDUCE_THREADS_PER_QUERY);

int numThreadsInExecutorService = Runtime.getRuntime().availableProcessors();
LOGGER.info("Initializing BrokerReduceService with {} threads, and {} max reduce threads.",
numThreadsInExecutorService, _maxReduceThreadsPerQuery);

ThreadFactory reduceThreadFactory =
new ThreadFactoryBuilder().setDaemon(false).setPriority(QUERY_RUNNER_THREAD_PRIORITY)
.setNameFormat(REDUCE_THREAD_NAME_FORMAT).build();

// ExecutorService is initialized with numThreads same as availableProcessors.
_reduceExecutorService = Executors.newFixedThreadPool(numThreadsInExecutorService, reduceThreadFactory);
}

public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest,
Map<ServerRoutingInstance, DataTable> dataTableMap, @Nullable BrokerMetrics brokerMetrics) {
Map<ServerRoutingInstance, DataTable> dataTableMap, long reduceTimeOutMs, @Nullable BrokerMetrics brokerMetrics) {
if (dataTableMap.size() == 0) {
// Empty response.
return BrokerResponseNative.empty();
Expand Down Expand Up @@ -189,8 +225,8 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest,

QueryContext queryContext = BrokerRequestToQueryContextConverter.convert(brokerRequest);
DataTableReducer dataTableReducer = ResultReducerFactory.getResultReducer(queryContext);
dataTableReducer
.reduceAndSetResults(tableName, cachedDataSchema, dataTableMap, brokerResponseNative, brokerMetrics);
dataTableReducer.reduceAndSetResults(tableName, cachedDataSchema, dataTableMap, brokerResponseNative,
new DataTableReducerContext(_reduceExecutorService, _maxReduceThreadsPerQuery, reduceTimeOutMs), brokerMetrics);
updateAlias(queryContext, brokerResponseNative);
return brokerResponseNative;
}
Expand Down Expand Up @@ -219,4 +255,8 @@ private static void updateAlias(QueryContext queryContext, BrokerResponseNative
}
}
}

public void shutDown() {
_reduceExecutorService.shutdownNow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ public interface DataTableReducer {
* @param dataSchema schema from broker reduce service
* @param dataTableMap map of servers to data tables
* @param brokerResponseNative broker response
* @param reducerContext DataTableReducer context
* @param brokerMetrics broker metrics
*/
void reduceAndSetResults(String tableName, DataSchema dataSchema,
Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative brokerResponseNative,
BrokerMetrics brokerMetrics);
void reduceAndSetResults(String tableName, DataSchema dataSchema, Map<ServerRoutingInstance, DataTable> dataTableMap,
BrokerResponseNative brokerResponseNative, DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.query.reduce;

import java.util.concurrent.ExecutorService;


/**
* POJO class to encapsulate DataTableReducer context information
*/
public class DataTableReducerContext {

private final ExecutorService _executorService;
private final int _maxReduceThreadsPerQuery;
private final long _reduceTimeOutMs;

/**
* Constructor for the class.
*
* @param executorService Executor service to use for DataTableReducer
* @param maxReduceThreadsPerQuery Max number of threads to use for reduce phase
* @param reduceTimeOutMs Reduce Phase timeOut in ms
*/
public DataTableReducerContext(ExecutorService executorService, int maxReduceThreadsPerQuery, long reduceTimeOutMs) {
_executorService = executorService;
_maxReduceThreadsPerQuery = maxReduceThreadsPerQuery;
_reduceTimeOutMs = reduceTimeOutMs;
}

public ExecutorService getExecutorService() {
return _executorService;
}

public int getMaxReduceThreadsPerQuery() {
return _maxReduceThreadsPerQuery;
}

public long getReduceTimeOutMs() {
return _reduceTimeOutMs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class DistinctDataTableReducer implements DataTableReducer {
@Override
public void reduceAndSetResults(String tableName, DataSchema dataSchema,
Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative brokerResponseNative,
BrokerMetrics brokerMetrics) {
DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
// DISTINCT is implemented as an aggregation function in the execution engine. Just like
// other aggregation functions, DISTINCT returns its result as a single object
// (of type DistinctTable) serialized by the server into the DataTable and deserialized
Expand Down
Loading