Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.core.auth.ManualAuthorization;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import org.apache.pinot.spi.utils.JsonUtils;
Expand Down Expand Up @@ -178,8 +179,14 @@ private String getMultiStageQueryResponse(String query, String queryOptions, Htt
new Exception("Unable to find table name from SQL thus cannot dispatch to broker.")).toString();
}

String brokerTenant = getCommonBrokerTenant(tableNames);
String serverTenant = getCommonServerTenant(tableNames);
List<TableConfig> tableConfigList = getListTableConfigs(tableNames);
if (tableConfigList == null || tableConfigList.size() == 0) {
return QueryException.getException(QueryException.BROKER_RESOURCE_MISSING_ERROR, new Exception(
"Unable to find table in cluster")).toString();
}

String brokerTenant = getCommonBrokerTenant(tableConfigList);
String serverTenant = getCommonServerTenant(tableConfigList);
if (brokerTenant == null || serverTenant == null) {
return QueryException.getException(QueryException.BROKER_REQUEST_SEND_ERROR,
new Exception(String.format("Unable to dispatch multistage query with multiple tables : %s "
Expand Down Expand Up @@ -239,37 +246,42 @@ private String getQueryResponse(String query, @Nullable SqlNode sqlNode, String
return sendRequestToBroker(query, instanceId, traceEnabled, queryOptions, httpHeaders);
}

// return the brokerTenant if all tables point to the same broker, else returns null
private String getCommonBrokerTenant(List<String> tableNames) {
Set<String> tableBrokers = new HashSet<>();
for (String tableName : tableNames) {
// given a list of tables, returns the list of tableConfigs
private List<TableConfig> getListTableConfigs(List<String> tableNames) {
List<TableConfig> allTableConfigList = new ArrayList<>();
for (String tableName: tableNames) {
List<TableConfig> tableConfigList = new ArrayList<>();
if (_pinotHelixResourceManager.hasRealtimeTable(tableName)) {
tableBrokers.add(Objects.requireNonNull(_pinotHelixResourceManager.getRealtimeTableConfig(tableName))
.getTenantConfig().getBroker());
tableConfigList.add(Objects.requireNonNull(_pinotHelixResourceManager.getRealtimeTableConfig(tableName)));
}
if (_pinotHelixResourceManager.hasOfflineTable(tableName)) {
tableBrokers.add(Objects.requireNonNull(_pinotHelixResourceManager.getOfflineTableConfig(tableName))
.getTenantConfig().getBroker());
tableConfigList.add(Objects.requireNonNull(_pinotHelixResourceManager.getOfflineTableConfig(tableName)));
}
if (tableConfigList.size() == 0) {
return null;
}
allTableConfigList.addAll(tableConfigList);
}
return allTableConfigList;
}

// return the brokerTenant if all table configs point to the same broker, else returns null
private String getCommonBrokerTenant(List<TableConfig> tableConfigList) {
Set<String> tableBrokers = new HashSet<>();
for (TableConfig tableConfig : tableConfigList) {
tableBrokers.add(tableConfig.getTenantConfig().getBroker());
}
if (tableBrokers.size() != 1) {
return null;
}
return (String) (tableBrokers.toArray()[0]);
}

// return the serverTenant if all tables point to the same broker, else returns null
private String getCommonServerTenant(List<String> tableNames) {
// return the serverTenant if all table configs point to the same server, else returns null
private String getCommonServerTenant(List<TableConfig> tableConfigList) {
Set<String> tableServers = new HashSet<>();
for (String tableName : tableNames) {
if (_pinotHelixResourceManager.hasRealtimeTable(tableName)) {
tableServers.add(Objects.requireNonNull(_pinotHelixResourceManager.getRealtimeTableConfig(tableName))
.getTenantConfig().getServer());
}
if (_pinotHelixResourceManager.hasOfflineTable(tableName)) {
tableServers.add(Objects.requireNonNull(_pinotHelixResourceManager.getOfflineTableConfig(tableName))
.getTenantConfig().getServer());
}
for (TableConfig tableConfig : tableConfigList) {
tableServers.add(tableConfig.getTenantConfig().getServer());
}
if (tableServers.size() != 1) {
return null;
Expand Down