Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pinot.broker.requesthandler.BaseSingleStageBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandlerDelegate;
import org.apache.pinot.broker.requesthandler.BrokerRequestIdGenerator;
import org.apache.pinot.broker.requesthandler.GrpcBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.MultiStageBrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.MultiStageQueryThrottler;
Expand Down Expand Up @@ -361,12 +362,14 @@ public void start()

// Create Broker request handler.
String brokerId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId());
BrokerRequestIdGenerator requestIdGenerator = new BrokerRequestIdGenerator();
String brokerRequestHandlerType =
_brokerConf.getProperty(Broker.BROKER_REQUEST_HANDLER_TYPE, Broker.DEFAULT_BROKER_REQUEST_HANDLER_TYPE);
BaseSingleStageBrokerRequestHandler singleStageBrokerRequestHandler;
if (brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) {
singleStageBrokerRequestHandler = new GrpcBrokerRequestHandler(_brokerConf, brokerId, _routingManager,
_accessControlFactory, _queryQuotaManager, tableCache, _failureDetector, _resourceUsageAccountant);
singleStageBrokerRequestHandler =
new GrpcBrokerRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager,
_accessControlFactory, _queryQuotaManager, tableCache, _failureDetector, _resourceUsageAccountant);
} else {
// Default request handler type, i.e. netty
NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, Broker.BROKER_NETTY_PREFIX);
Expand All @@ -376,9 +379,9 @@ public void start()
tlsDefaults = TlsUtils.extractTlsConfig(_brokerConf, Broker.BROKER_TLS_PREFIX);
}
singleStageBrokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
_queryQuotaManager, tableCache, nettyDefaults, tlsDefaults, _serverRoutingStatsManager,
_failureDetector, _resourceUsageAccountant);
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager,
_accessControlFactory, _queryQuotaManager, tableCache, nettyDefaults, tlsDefaults,
_serverRoutingStatsManager, _failureDetector, _resourceUsageAccountant);
}
MultiStageBrokerRequestHandler multiStageBrokerRequestHandler = null;
QueryDispatcher queryDispatcher = null;
Expand All @@ -390,14 +393,16 @@ public void start()
// TODO: decouple protocol and engine selection.
queryDispatcher = createQueryDispatcher(_brokerConf);
multiStageBrokerRequestHandler =
new MultiStageBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
_queryQuotaManager, tableCache, _multiStageQueryThrottler, _failureDetector, _resourceUsageAccountant);
new MultiStageBrokerRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager,
_accessControlFactory, _queryQuotaManager, tableCache, _multiStageQueryThrottler, _failureDetector,
_resourceUsageAccountant);
}
TimeSeriesRequestHandler timeSeriesRequestHandler = null;
if (StringUtils.isNotBlank(_brokerConf.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey()))) {
Preconditions.checkNotNull(queryDispatcher, "Multistage Engine should be enabled to use time-series engine");
timeSeriesRequestHandler = new TimeSeriesRequestHandler(_brokerConf, brokerId, _routingManager,
_accessControlFactory, _queryQuotaManager, tableCache, queryDispatcher, _resourceUsageAccountant);
timeSeriesRequestHandler =
new TimeSeriesRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager,
_accessControlFactory, _queryQuotaManager, tableCache, queryDispatcher, _resourceUsageAccountant);
}

LOGGER.info("Initializing PinotFSFactory");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,20 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
*/
protected final Map<Long, String> _clientQueryIds;

public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, RoutingManager routingManager,
public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId,
BrokerRequestIdGenerator requestIdGenerator, RoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
ThreadResourceUsageAccountant resourceUsageAccountant) {
_config = config;
_brokerId = brokerId;
_requestIdGenerator = requestIdGenerator;
_routingManager = routingManager;
_accessControlFactory = accessControlFactory;
_queryQuotaManager = queryQuotaManager;
_tableCache = tableCache;
_brokerMetrics = BrokerMetrics.get();
_brokerQueryEventListener = BrokerQueryEventListenerFactory.getBrokerQueryEventListener();
_trackedHeaders = BrokerQueryEventListenerFactory.getTrackedHeaders();
_requestIdGenerator = new BrokerRequestIdGenerator(brokerId);
_brokerTimeoutMs = config.getProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, Broker.DEFAULT_BROKER_TIMEOUT_MS);
_enableRowColumnLevelAuth = config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_ROW_COLUMN_LEVEL_AUTH,
Broker.DEFAULT_BROKER_ENABLE_ROW_COLUMN_LEVEL_AUTH);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,11 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ
protected LogicalTableRouteProvider _logicalTableRouteProvider;

public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String brokerId,
RoutingManager routingManager, AccessControlFactory accessControlFactory,
QueryQuotaManager queryQuotaManager, TableCache tableCache, ThreadResourceUsageAccountant accountant) {
super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, accountant);
BrokerRequestIdGenerator requestIdGenerator, RoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
ThreadResourceUsageAccountant accountant) {
super(config, brokerId, requestIdGenerator, routingManager, accessControlFactory, queryQuotaManager, tableCache,
accountant);
_disableGroovy = _config.getProperty(Broker.DISABLE_GROOVY, Broker.DEFAULT_DISABLE_GROOVY);
_useApproximateFunction = _config.getProperty(Broker.USE_APPROXIMATE_FUNCTION, false);
_defaultHllLog2m = _config.getProperty(CommonConstants.Helix.DEFAULT_HYPERLOGLOG_LOG2M_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,26 @@
*/
package org.apache.pinot.broker.requesthandler;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;

/**
* An ID generator to produce a global unique identifier for each query, used in v1/v2 engine for tracking and
* inter-stage communication(v2 only). It's guaranteed by:
* <ol>
* <li>
* Using a mask computed using the hash-code of the broker-id to ensure two brokers don't arrive at the same
* requestId. This mask becomes the most significant 9 digits (in base-10).
* </li>
* <li>
* Using a auto-incrementing counter for the least significant 9 digits (in base-10).
* </li>
* </ol>
*/

/// An ID generator to produce a global unique identifier for each query, used in single-stage/multi-stage engine for
/// tracking and inter-stage communication (multi-stage engine only). It's guaranteed by:
/// - Using a random number to ensure two brokers (or restarted broker) don't arrive at the same request ID. This random
/// number becomes the most significant 10 digits of the request ID (in base-10).
/// - Using an auto-incrementing counter for the least significant 9 digits (in base-10).
public class BrokerRequestIdGenerator {
private static final long OFFSET = 1_000_000_000L;
private final long _mask;
private final long _base;
private final AtomicLong _incrementingId = new AtomicLong(0);

public BrokerRequestIdGenerator(String brokerId) {
_mask = ((long) (brokerId.hashCode() & Integer.MAX_VALUE)) * OFFSET;
public BrokerRequestIdGenerator() {
_base = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE) * OFFSET;
}

public long get() {
long normalized = (_incrementingId.getAndIncrement() & Long.MAX_VALUE) % OFFSET;
return _mask + normalized;
return _base + normalized;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ public class GrpcBrokerRequestHandler extends BaseSingleStageBrokerRequestHandle
private final FailureDetector _failureDetector;

// TODO: Support TLS
public GrpcBrokerRequestHandler(PinotConfiguration config, String brokerId, RoutingManager routingManager,
public GrpcBrokerRequestHandler(PinotConfiguration config, String brokerId,
BrokerRequestIdGenerator requestIdGenerator, RoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
FailureDetector failureDetector, ThreadResourceUsageAccountant accountant) {
super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, accountant);
super(config, brokerId, requestIdGenerator, routingManager, accessControlFactory, queryQuotaManager, tableCache,
accountant);
_streamingReduceService = new StreamingReduceService(config);
_streamingQueryClient = new PinotServerStreamingQueryClient(GrpcConfig.buildGrpcQueryConfig(config));
_failureDetector = failureDetector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,17 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
private final ExecutorService _queryCompileExecutor;
protected final long _extraPassiveTimeoutMs;

public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId, RoutingManager routingManager,
public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId,
BrokerRequestIdGenerator requestIdGenerator, RoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
MultiStageQueryThrottler queryThrottler, FailureDetector failureDetector,
ThreadResourceUsageAccountant accountant) {
super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, accountant);
super(config, brokerId, requestIdGenerator, routingManager, accessControlFactory, queryQuotaManager, tableCache,
accountant);
String hostname = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
int port = Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
_workerManager = new WorkerManager(_brokerId, hostname, port, _routingManager);
TlsConfig tlsConfig = config.getProperty(
CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_TLS_ENABLED,
TlsConfig tlsConfig = config.getProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_TLS_ENABLED,
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_TLS_ENABLED) ? TlsUtils.extractTlsConfig(config,
CommonConstants.Broker.BROKER_TLS_PREFIX) : null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,12 @@ public class SingleConnectionBrokerRequestHandler extends BaseSingleStageBrokerR
private final FailureDetector _failureDetector;

public SingleConnectionBrokerRequestHandler(PinotConfiguration config, String brokerId,
RoutingManager routingManager, AccessControlFactory accessControlFactory,
QueryQuotaManager queryQuotaManager, TableCache tableCache, NettyConfig nettyConfig, TlsConfig tlsConfig,
ServerRoutingStatsManager serverRoutingStatsManager, FailureDetector failureDetector,
ThreadResourceUsageAccountant accountant) {
super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, accountant);
BrokerRequestIdGenerator requestIdGenerator, RoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
NettyConfig nettyConfig, TlsConfig tlsConfig, ServerRoutingStatsManager serverRoutingStatsManager,
FailureDetector failureDetector, ThreadResourceUsageAccountant accountant) {
super(config, brokerId, requestIdGenerator, routingManager, accessControlFactory, queryQuotaManager, tableCache,
accountant);
_brokerReduceService = new BrokerReduceService(_config, accountant);
_queryRouter = new QueryRouter(_brokerId, _brokerMetrics, nettyConfig, tlsConfig, serverRoutingStatsManager);
_failureDetector = failureDetector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,12 @@ public class TimeSeriesRequestHandler extends BaseBrokerRequestHandler {
private final TimeSeriesQueryEnvironment _queryEnvironment;
private final QueryDispatcher _queryDispatcher;

public TimeSeriesRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
public TimeSeriesRequestHandler(PinotConfiguration config, String brokerId,
BrokerRequestIdGenerator requestIdGenerator, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
QueryDispatcher queryDispatcher, ThreadResourceUsageAccountant accountant) {
super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, accountant);
super(config, brokerId, requestIdGenerator, routingManager, accessControlFactory, queryQuotaManager, tableCache,
accountant);
_queryEnvironment = new TimeSeriesQueryEnvironment(config, routingManager, tableCache);
_queryEnvironment.init(config);
_queryDispatcher = queryDispatcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public void testCancelQuery() {
new PinotConfiguration(Map.of(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION, "true"));
BrokerQueryEventListenerFactory.init(config);
BaseSingleStageBrokerRequestHandler requestHandler =
new BaseSingleStageBrokerRequestHandler(config, "testBrokerId", routingManager,
new BaseSingleStageBrokerRequestHandler(config, "testBrokerId", new BrokerRequestIdGenerator(), routingManager,
new AllowAllAccessControlFactory(), queryQuotaManager, tableCache,
new Tracing.DefaultThreadResourceUsageAccountant()) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,9 @@ public void testLiteralOnlyWithAsBrokerRequestFromSQL() {
public void testBrokerRequestHandler()
throws Exception {
SingleConnectionBrokerRequestHandler requestHandler =
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), "testBrokerId", null, ACCESS_CONTROL_FACTORY,
null, null, null, null, mock(ServerRoutingStatsManager.class), mock(FailureDetector.class),
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), "testBrokerId",
new BrokerRequestIdGenerator(), null, ACCESS_CONTROL_FACTORY, null, null, null, null,
mock(ServerRoutingStatsManager.class), mock(FailureDetector.class),
new Tracing.DefaultThreadResourceUsageAccountant());

long randNum = RANDOM.nextLong();
Expand All @@ -196,8 +197,9 @@ null, null, null, null, mock(ServerRoutingStatsManager.class), mock(FailureDetec
public void testBrokerRequestHandlerWithAsFunction()
throws Exception {
SingleConnectionBrokerRequestHandler requestHandler =
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), "testBrokerId", null, ACCESS_CONTROL_FACTORY,
null, null, null, null, mock(ServerRoutingStatsManager.class), mock(FailureDetector.class),
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), "testBrokerId",
new BrokerRequestIdGenerator(), null, ACCESS_CONTROL_FACTORY, null, null, null, null,
mock(ServerRoutingStatsManager.class), mock(FailureDetector.class),
new Tracing.DefaultThreadResourceUsageAccountant());
long currentTsMin = System.currentTimeMillis();
BrokerResponse brokerResponse = requestHandler.handleRequest(
Expand Down Expand Up @@ -351,8 +353,9 @@ null, null, null, null, mock(ServerRoutingStatsManager.class), mock(FailureDetec
public void testExplainPlanLiteralOnly()
throws Exception {
SingleConnectionBrokerRequestHandler requestHandler =
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), "testBrokerId", null, ACCESS_CONTROL_FACTORY,
null, null, null, null, mock(ServerRoutingStatsManager.class), mock(FailureDetector.class),
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), "testBrokerId",
new BrokerRequestIdGenerator(), null, ACCESS_CONTROL_FACTORY, null, null, null, null,
mock(ServerRoutingStatsManager.class), mock(FailureDetector.class),
new Tracing.DefaultThreadResourceUsageAccountant());

// Test 1: select constant
Expand Down