Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
89d019f
[multistage] support database
shounakmk219 Mar 7, 2024
9d91b32
lint fix
shounakmk219 Mar 7, 2024
1c0b240
Move isPartOfDatabase utility to DatabaseUtils
shounakmk219 Mar 15, 2024
98195f4
Reuse utils and constants
shounakmk219 Mar 15, 2024
246a9b3
lint fix
shounakmk219 Mar 15, 2024
8c4649f
Fix test case
shounakmk219 Mar 15, 2024
84fcbde
review fixes
shounakmk219 Mar 18, 2024
3b3b87e
[multistage] support database
shounakmk219 Mar 7, 2024
ed25b48
lint fix
shounakmk219 Mar 7, 2024
62dd3fd
Move isPartOfDatabase utility to DatabaseUtils
shounakmk219 Mar 15, 2024
86007f9
Reuse utils and constants
shounakmk219 Mar 15, 2024
fc6bf1d
lint fix
shounakmk219 Mar 15, 2024
601b896
Fix test case
shounakmk219 Mar 15, 2024
96cee48
review fixes
shounakmk219 Mar 18, 2024
345cc0e
keep separate tests for all database context checks
shounakmk219 Mar 19, 2024
72520e1
Merge branch 'db-support-in-v2-engine' of https://github.com/shounakm…
shounakmk219 Mar 20, 2024
49d6c7f
remove subCatalog and use root catalog itself for query specific data…
shounakmk219 Mar 20, 2024
c0aa537
Merge remote-tracking branch 'upstream/master' into db-support-in-v2-…
shounakmk219 Mar 20, 2024
74d9bf6
table cache mock fix
shounakmk219 Mar 20, 2024
c1f67c0
Bugfix. Table name extraction from TableCache
shounakmk219 Mar 21, 2024
929bced
Use "default" database context when no context is passed
shounakmk219 Mar 21, 2024
724f392
cosmetic fixes
shounakmk219 Mar 21, 2024
cdae3c0
Fix the expected explain plan with default path
shounakmk219 Mar 21, 2024
7bd212c
Add cross database query test
shounakmk219 Mar 21, 2024
ae66547
Add more tests for database context from header
shounakmk219 Mar 21, 2024
071757d
fix missed LogicalTableScan content updates
shounakmk219 Mar 21, 2024
2c61d5b
typo
shounakmk219 Mar 21, 2024
de836ed
Merge remote-tracking branch 'upstream/master' into db-support-in-v2-…
shounakmk219 Mar 21, 2024
3294fcf
test fix
shounakmk219 Mar 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.common.utils.ExceptionUtils;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
Expand Down Expand Up @@ -81,7 +82,7 @@
public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(MultiStageBrokerRequestHandler.class);

private final QueryEnvironment _queryEnvironment;
private final WorkerManager _workerManager;
private final MailboxService _mailboxService;
private final QueryDispatcher _queryDispatcher;

Expand All @@ -93,9 +94,7 @@ public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId
LOGGER.info("Using Multi-stage BrokerRequestHandler.");
String hostname = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
int port = Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
_queryEnvironment = new QueryEnvironment(new TypeFactory(new TypeSystem()),
CalciteSchemaBuilder.asRootSchema(new PinotCatalog(tableCache)),
new WorkerManager(hostname, port, routingManager), _tableCache);
_workerManager = new WorkerManager(hostname, port, _routingManager);
_mailboxService = new MailboxService(hostname, port, config);
_queryDispatcher = new QueryDispatcher(_mailboxService);

Expand Down Expand Up @@ -126,11 +125,15 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
try {
Long timeoutMsFromQueryOption = QueryOptionsUtils.getTimeoutMs(sqlNodeAndOptions.getOptions());
queryTimeoutMs = timeoutMsFromQueryOption == null ? _brokerTimeoutMs : timeoutMsFromQueryOption;
String database = DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(), httpHeaders);
// Compile the request
compilationStartTimeNs = System.nanoTime();
QueryEnvironment queryEnvironment = new QueryEnvironment(new TypeFactory(new TypeSystem()),
CalciteSchemaBuilder.asRootSchema(new PinotCatalog(database, _tableCache), database), _workerManager,
_tableCache);
switch (sqlNodeAndOptions.getSqlNode().getKind()) {
case EXPLAIN:
queryPlanResult = _queryEnvironment.explainQuery(query, sqlNodeAndOptions, requestId);
queryPlanResult = queryEnvironment.explainQuery(query, sqlNodeAndOptions, requestId);
String plan = queryPlanResult.getExplainPlan();
Set<String> tableNames = queryPlanResult.getTableNames();
if (!hasTableAccess(requesterIdentity, tableNames, requestContext, httpHeaders)) {
Expand All @@ -140,7 +143,7 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
return constructMultistageExplainPlan(query, plan);
case SELECT:
default:
queryPlanResult = _queryEnvironment.planQuery(query, sqlNodeAndOptions, requestId);
queryPlanResult = queryEnvironment.planQuery(query, sqlNodeAndOptions, requestId);
break;
}
} catch (WebApplicationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pinot.common.utils;

import com.google.common.base.Preconditions;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import javax.ws.rs.core.HttpHeaders;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -84,4 +86,48 @@ public static String translateTableName(String tableName, HttpHeaders headers, b
public static String translateTableName(String tableName, HttpHeaders headers) {
return translateTableName(tableName, headers, false);
}

/**
* Checks if the fully qualified {@code tableName} belongs to the provided {@code databaseName}
* @param tableName fully qualified table name
* @param databaseName database name
* @return true if
* <ul>
* <li>
* tableName is prefixed with "databaseName." or
* </li>
* <li>
* databaseName is null or "default" and tableName does not have a '.'
* </li>
* </ul>
* else false
*/
public static boolean isPartOfDatabase(String tableName, @Nullable String databaseName) {
// assumes tableName will not have default database prefix ('default.')
if (StringUtils.isEmpty(databaseName) || databaseName.equalsIgnoreCase(CommonConstants.DEFAULT_DATABASE)) {
return !tableName.contains(".");
} else {
return tableName.startsWith(databaseName + ".");
}
}

/**
* Extract database context from headers and query options
* @param queryOptions Query option from request
* @param headers http headers from request
* @return extracted database name.
* <br>If database context is not provided at all return {@link CommonConstants#DEFAULT_DATABASE}.
* <br>If queryOptions and headers have conflicting database context an {@link IllegalArgumentException} is thrown.
*/
public static String extractDatabaseFromQueryRequest(
@Nullable Map<String, String> queryOptions, @Nullable HttpHeaders headers) {
String databaseFromOptions = queryOptions == null ? null : queryOptions.get(CommonConstants.DATABASE);
String databaseFromHeaders = headers == null ? null : headers.getHeaderString(CommonConstants.DATABASE);
if (databaseFromHeaders != null && databaseFromOptions != null) {
Preconditions.checkArgument(databaseFromOptions.equals(databaseFromHeaders), "Database context mismatch : "
+ "from headers %s, from query options %s", databaseFromHeaders, databaseFromOptions);
}
String database = databaseFromHeaders != null ? databaseFromHeaders : databaseFromOptions;
return Objects.requireNonNullElse(database, CommonConstants.DEFAULT_DATABASE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.controller.ControllerConf;
Expand Down Expand Up @@ -205,8 +206,14 @@ private String getMultiStageQueryResponse(String query, String queryOptions, Htt
throw new WebApplicationException("Permission denied", Response.Status.FORBIDDEN);
}

Map<String, String> queryOptionsMap = RequestUtils.parseQuery(query).getOptions();
if (queryOptions != null) {
queryOptionsMap.putAll(RequestUtils.getOptionsFromString(queryOptions));
}
String database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptionsMap, httpHeaders);
QueryEnvironment queryEnvironment = new QueryEnvironment(new TypeFactory(new TypeSystem()),
CalciteSchemaBuilder.asRootSchema(new PinotCatalog(_pinotHelixResourceManager.getTableCache())), null, null);
CalciteSchemaBuilder.asRootSchema(new PinotCatalog(database, _pinotHelixResourceManager.getTableCache()),
database), null, null);
List<String> tableNames;
try {
tableNames = queryEnvironment.getTableNamesForQuery(query);
Expand Down Expand Up @@ -242,6 +249,11 @@ private String getQueryResponse(String query, @Nullable SqlNode sqlNode, String
HttpHeaders httpHeaders) {
// Get resource table name.
String tableName;
Map<String, String> queryOptionsMap = RequestUtils.parseQuery(query).getOptions();
if (queryOptions != null) {
queryOptionsMap.putAll(RequestUtils.getOptionsFromString(queryOptions));
}
String database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptionsMap, httpHeaders);
try {
String inputTableName =
sqlNode != null ? RequestUtils.getTableNames(CalciteSqlParser.compileSqlNodeToPinotQuery(sqlNode)).iterator()
Expand All @@ -254,8 +266,8 @@ private String getQueryResponse(String query, @Nullable SqlNode sqlNode, String
// try to compile the query using multi-stage engine and suggest using it if it succeeds.
LOGGER.info("Trying to compile query {} using multi-stage engine", query);
QueryEnvironment queryEnvironment = new QueryEnvironment(new TypeFactory(new TypeSystem()),
CalciteSchemaBuilder.asRootSchema(new PinotCatalog(_pinotHelixResourceManager.getTableCache())), null,
null);
CalciteSchemaBuilder.asRootSchema(new PinotCatalog(database, _pinotHelixResourceManager.getTableCache()),
database), null, null);
queryEnvironment.getTableNamesForQuery(query);
LOGGER.info("Successfully compiled query using multi-stage engine: {}", query);
return QueryException.getException(QueryException.SQL_PARSING_ERROR, new Exception(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,22 +732,14 @@ public List<String> getAllTables() {
public List<String> getAllTables(@Nullable String databaseName) {
List<String> tableNames = new ArrayList<>();
for (String resourceName : getAllResources()) {
if (TableNameBuilder.isTableResource(resourceName) && isPartOfDatabase(resourceName, databaseName)) {
if (TableNameBuilder.isTableResource(resourceName)
&& DatabaseUtils.isPartOfDatabase(resourceName, databaseName)) {
tableNames.add(resourceName);
}
}
return tableNames;
}

private boolean isPartOfDatabase(String tableName, @Nullable String databaseName) {
// assumes tableName will not have default database prefix ('default.')
if (StringUtils.isEmpty(databaseName) || databaseName.equalsIgnoreCase(CommonConstants.DEFAULT_DATABASE)) {
return StringUtils.split(tableName, '.').length == 1;
} else {
return tableName.startsWith(databaseName + ".");
}
}

/**
* Get all offline table names from default database.
*
Expand All @@ -766,7 +758,8 @@ public List<String> getAllOfflineTables() {
public List<String> getAllOfflineTables(@Nullable String databaseName) {
List<String> offlineTableNames = new ArrayList<>();
for (String resourceName : getAllResources()) {
if (isPartOfDatabase(resourceName, databaseName) && TableNameBuilder.isOfflineTableResource(resourceName)) {
if (DatabaseUtils.isPartOfDatabase(resourceName, databaseName)
&& TableNameBuilder.isOfflineTableResource(resourceName)) {
offlineTableNames.add(resourceName);
}
}
Expand All @@ -790,7 +783,7 @@ public List<String> getAllDimensionTables() {
*/
public List<String> getAllDimensionTables(@Nullable String databaseName) {
return _tableCache.getAllDimensionTables().stream()
.filter(table -> isPartOfDatabase(table, databaseName))
.filter(table -> DatabaseUtils.isPartOfDatabase(table, databaseName))
.collect(Collectors.toList());
}

Expand All @@ -812,7 +805,8 @@ public List<String> getAllRealtimeTables() {
public List<String> getAllRealtimeTables(@Nullable String databaseName) {
List<String> realtimeTableNames = new ArrayList<>();
for (String resourceName : getAllResources()) {
if (isPartOfDatabase(resourceName, databaseName) && TableNameBuilder.isRealtimeTableResource(resourceName)) {
if (DatabaseUtils.isPartOfDatabase(resourceName, databaseName)
&& TableNameBuilder.isRealtimeTableResource(resourceName)) {
realtimeTableNames.add(resourceName);
}
}
Expand All @@ -837,7 +831,8 @@ public List<String> getAllRawTables() {
public List<String> getAllRawTables(@Nullable String databaseName) {
Set<String> rawTableNames = new HashSet<>();
for (String resourceName : getAllResources()) {
if (TableNameBuilder.isTableResource(resourceName) && isPartOfDatabase(resourceName, databaseName)) {
if (TableNameBuilder.isTableResource(resourceName)
&& DatabaseUtils.isPartOfDatabase(resourceName, databaseName)) {
rawTableNames.add(TableNameBuilder.extractRawTableName(resourceName));
}
}
Expand Down Expand Up @@ -1616,7 +1611,7 @@ public List<String> getSchemaNames(@Nullable String databaseName) {
List<String> schemas = _propertyStore.getChildNames(
PinotHelixPropertyStoreZnRecordProvider.forSchema(_propertyStore).getRelativePath(), AccessOption.PERSISTENT);
if (schemas != null) {
return schemas.stream().filter(schemaName -> isPartOfDatabase(schemaName, databaseName))
return schemas.stream().filter(schemaName -> DatabaseUtils.isPartOfDatabase(schemaName, databaseName))
.collect(Collectors.toList());
}
return Collections.emptyList();
Expand Down Expand Up @@ -4011,7 +4006,7 @@ public Map<String, List<InstanceInfo>> getTableToLiveBrokersMapping(@Nullable St
ZNRecord znRecord = ev.getRecord();
for (Map.Entry<String, Map<String, String>> tableToBrokersEntry : znRecord.getMapFields().entrySet()) {
String tableName = tableToBrokersEntry.getKey();
if (!isPartOfDatabase(tableName, databaseName)) {
if (!DatabaseUtils.isPartOfDatabase(tableName, databaseName)) {
continue;
}
Map<String, String> brokersToState = tableToBrokersEntry.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,15 @@ protected JsonNode postQuery(String query)
getExtraQueryProperties());
}

/**
* Queries the broker's sql query endpoint (/query/sql)
*/
protected JsonNode postQuery(String query, Map<String, String> headers)
throws Exception {
return postQuery(query, getBrokerQueryApiUrl(getBrokerBaseApiUrl(), useMultiStageQueryEngine()), headers,
getExtraQueryProperties());
}

protected Map<String, String> getExtraQueryProperties() {
return Collections.emptyMap();
}
Expand Down
Loading