Skip to content

Commit

Permalink
Move brokerId extraction to BaseBrokerStarter (apache#9965)
Browse files Browse the repository at this point in the history
Co-authored-by: Jack Li(Analytics Engineering) <jlli@jlli-mn1.linkedin.biz>
  • Loading branch information
jackjlli and Jack Li(Analytics Engineering) authored Dec 12, 2022
1 parent 2253bd7 commit 9c64672
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -264,21 +265,22 @@ public void start()
NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, Broker.BROKER_NETTY_PREFIX);

// Create Broker request handler.
String brokerId = _brokerConf.getProperty(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId());
String brokerRequestHandlerType =
_brokerConf.getProperty(Broker.BROKER_REQUEST_HANDLER_TYPE, Broker.DEFAULT_BROKER_REQUEST_HANDLER_TYPE);
BrokerRequestHandler singleStageBrokerRequestHandler = null;
if (brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) {
singleStageBrokerRequestHandler =
new GrpcBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory, queryQuotaManager,
new GrpcBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory, queryQuotaManager,
tableCache, _brokerMetrics, null);
} else { // default request handler type, e.g. netty
if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) {
singleStageBrokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory,
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, tlsDefaults, _serverRoutingStatsManager);
} else {
singleStageBrokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory,
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, null, _serverRoutingStatsManager);
}
}
Expand All @@ -289,11 +291,11 @@ public void start()
// worker requires both the "Netty port" for protocol transport; and "GRPC port" for mailbox transport.
// TODO: decouple protocol and engine selection.
multiStageBrokerRequestHandler =
new MultiStageBrokerRequestHandler(_brokerConf, _routingManager, _accessControlFactory, queryQuotaManager,
tableCache, _brokerMetrics);
new MultiStageBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache, _brokerMetrics);
}

_brokerRequestHandler = new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler,
_brokerRequestHandler = new BrokerRequestHandlerDelegate(brokerId, singleStageBrokerRequestHandler,
multiStageBrokerRequestHandler, _brokerMetrics);
_brokerRequestHandler.start();
String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL);
Expand Down Expand Up @@ -433,6 +435,15 @@ private void registerServiceStatusHandler() {
new ServiceStatus.LifecycleServiceStatusCallback(this::isStarting, this::isShuttingDown))));
}

private String getDefaultBrokerId() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (Exception e) {
LOGGER.error("Caught exception while getting default broker Id", e);
return "";
}
}

@Override
public void stop() {
LOGGER.info("Shutting down Pinot broker");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -129,9 +128,10 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
private final boolean _enableDistinctCountBitmapOverride;
private final Map<Long, QueryServers> _queriesById;

public BaseBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager routingManager,
public BaseBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
BrokerMetrics brokerMetrics) {
_brokerId = brokerId;
_config = config;
_routingManager = routingManager;
_accessControlFactory = accessControlFactory;
Expand All @@ -146,7 +146,6 @@ public BaseBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager
_enableDistinctCountBitmapOverride =
_config.getProperty(CommonConstants.Helix.ENABLE_DISTINCT_COUNT_BITMAP_OVERRIDE_KEY, false);

_brokerId = config.getProperty(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId());
_brokerTimeoutMs = config.getProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, Broker.DEFAULT_BROKER_TIMEOUT_MS);
_queryResponseLimit =
config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
Expand All @@ -160,15 +159,6 @@ public BaseBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager
_queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit(), enableQueryCancellation);
}

private String getDefaultBrokerId() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (Exception e) {
LOGGER.error("Caught exception while getting default broker Id", e);
return "";
}
}

@Override
public Map<Long, String> getRunningQueries() {
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
Expand Down Expand Up @@ -240,7 +230,6 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption
@Nullable RequesterIdentity requesterIdentity, RequestContext requestContext)
throws Exception {
long requestId = _requestIdGenerator.incrementAndGet();
requestContext.setBrokerId(_brokerId);
requestContext.setRequestId(requestId);
requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ public class BrokerRequestHandlerDelegate implements BrokerRequestHandler {
private final BrokerRequestHandler _singleStageBrokerRequestHandler;
private final BrokerRequestHandler _multiStageWorkerRequestHandler;
private final BrokerMetrics _brokerMetrics;
private final String _brokerId;

public BrokerRequestHandlerDelegate(BrokerRequestHandler singleStageBrokerRequestHandler,
public BrokerRequestHandlerDelegate(String brokerId, BrokerRequestHandler singleStageBrokerRequestHandler,
@Nullable BrokerRequestHandler multiStageWorkerRequestHandler, BrokerMetrics brokerMetrics) {
_brokerId = brokerId;
_singleStageBrokerRequestHandler = singleStageBrokerRequestHandler;
_multiStageWorkerRequestHandler = multiStageWorkerRequestHandler;
_brokerMetrics = brokerMetrics;
Expand Down Expand Up @@ -81,6 +83,7 @@ public void shutDown() {
public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOptions sqlNodeAndOptions,
@Nullable RequesterIdentity requesterIdentity, RequestContext requestContext)
throws Exception {
requestContext.setBrokerId(_brokerId);
if (sqlNodeAndOptions == null) {
try {
sqlNodeAndOptions = RequestUtils.parseQuery(request.get(CommonConstants.Broker.Request.SQL).asText(), request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler {
private final PinotStreamingQueryClient _streamingQueryClient;

// TODO: Support TLS
public GrpcBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager routingManager,
public GrpcBrokerRequestHandler(PinotConfiguration config, String brokerId, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
BrokerMetrics brokerMetrics, TlsConfig tlsConfig) {
super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
LOGGER.info("Using Grpc BrokerRequestHandler.");
_grpcConfig = GrpcConfig.buildGrpcQueryConfig(config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,16 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
private final QueryEnvironment _queryEnvironment;
private final QueryDispatcher _queryDispatcher;

public MultiStageBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
BrokerMetrics brokerMetrics) {
super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerIdFromConfig,
BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory,
QueryQuotaManager queryQuotaManager, TableCache tableCache, BrokerMetrics brokerMetrics) {
super(config, brokerIdFromConfig, routingManager, accessControlFactory, queryQuotaManager, tableCache,
brokerMetrics);
LOGGER.info("Using Multi-stage BrokerRequestHandler.");
String reducerHostname = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME);
if (reducerHostname == null) {
// use broker ID as host name, but remove the
String brokerId = config.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ID);
String brokerId = brokerIdFromConfig;
brokerId = brokerId.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) ? brokerId.substring(
CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : brokerId;
brokerId = StringUtils.split(brokerId, "_").length > 1 ? StringUtils.split(brokerId, "_")[0] : brokerId;
Expand Down Expand Up @@ -108,7 +109,6 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption
@Nullable RequesterIdentity requesterIdentity, RequestContext requestContext)
throws Exception {
long requestId = _requestIdGenerator.incrementAndGet();
requestContext.setBrokerId(_brokerId);
requestContext.setRequestId(requestId);
requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl
private final QueryRouter _queryRouter;
private final FailureDetector _failureDetector;

public SingleConnectionBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
BrokerMetrics brokerMetrics, NettyConfig nettyConfig, TlsConfig tlsConfig,
ServerRoutingStatsManager serverRoutingStatsManager) {
super(config, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
public SingleConnectionBrokerRequestHandler(PinotConfiguration config, String brokerId,
BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory,
QueryQuotaManager queryQuotaManager, TableCache tableCache, BrokerMetrics brokerMetrics, NettyConfig nettyConfig,
TlsConfig tlsConfig, ServerRoutingStatsManager serverRoutingStatsManager) {
super(config, brokerId, routingManager, accessControlFactory, queryQuotaManager, tableCache, brokerMetrics);
LOGGER.info("Using Netty BrokerRequestHandler.");

_brokerReduceService = new BrokerReduceService(_config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public void testCancelQuery()
PinotConfiguration config =
new PinotConfiguration(Collections.singletonMap("pinot.broker.enable.query.cancellation", "true"));
BaseBrokerRequestHandler requestHandler =
new BaseBrokerRequestHandler(config, routingManager, new AllowAllAccessControlFactory(),
new BaseBrokerRequestHandler(config, null, routingManager, new AllowAllAccessControlFactory(),
queryQuotaManager, tableCache,
new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet())) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,9 @@ public void testLiteralOnlyWithAsBrokerRequestFromSQL() {
public void testBrokerRequestHandler()
throws Exception {
SingleConnectionBrokerRequestHandler requestHandler =
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, ACCESS_CONTROL_FACTORY, null, null,
new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
null, null, mock(ServerRoutingStatsManager.class));
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, ACCESS_CONTROL_FACTORY, null,
null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
null, mock(ServerRoutingStatsManager.class));

long randNum = RANDOM.nextLong();
byte[] randBytes = new byte[12];
Expand All @@ -209,9 +209,9 @@ public void testBrokerRequestHandler()
public void testBrokerRequestHandlerWithAsFunction()
throws Exception {
SingleConnectionBrokerRequestHandler requestHandler =
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, ACCESS_CONTROL_FACTORY, null, null,
new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
null, null, mock(ServerRoutingStatsManager.class));
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, ACCESS_CONTROL_FACTORY, null,
null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
null, mock(ServerRoutingStatsManager.class));
long currentTsMin = System.currentTimeMillis();
JsonNode request = JsonUtils.stringToJsonNode(
"{\"sql\":\"SELECT now() as currentTs, fromDateTime('2020-01-01 UTC', 'yyyy-MM-dd z') as firstDayOf2020\"}");
Expand Down Expand Up @@ -416,9 +416,9 @@ public void testBrokerRequestHandlerWithAsFunction()
public void testExplainPlanLiteralOnly()
throws Exception {
SingleConnectionBrokerRequestHandler requestHandler =
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, ACCESS_CONTROL_FACTORY, null, null,
new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()),
null, null, mock(ServerRoutingStatsManager.class));
new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, ACCESS_CONTROL_FACTORY, null,
null, new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet()), null,
null, mock(ServerRoutingStatsManager.class));

// Test 1: select constant
JsonNode request = JsonUtils.stringToJsonNode("{\"sql\":\"EXPLAIN PLAN FOR SELECT 1.5, 'test'\"}");
Expand Down

0 comments on commit 9c64672

Please sign in to comment.