Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,10 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentit
}
}
updateQuerySource(brokerRequest);
if (_enableCaseInsensitive) {
try {
handleCaseSensitivity(brokerRequest);
} catch (Exception e) {
LOGGER.warn("Caught exception while rewriting PQL to make it case-insensitive {}: {}, {}", requestId, query, e);
}
try {
updateColumnNames(brokerRequest);
} catch (Exception e) {
LOGGER.warn("Caught exception while updating Column names in Query {}: {}, {}", requestId, query, e);
}
if (_defaultHllLog2m > 0) {
handleHyperloglogLog2mOverride(brokerRequest, _defaultHllLog2m);
Expand Down Expand Up @@ -450,18 +448,22 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentit
private void updateQuerySource(BrokerRequest brokerRequest) {
String tableName = brokerRequest.getQuerySource().getTableName();
// Check if table is in the format of [database_name].[table_name]
String[] tableNameSplits = StringUtils.split(tableName, '.');
if (tableNameSplits.length != 2) {
return;
}
String[] tableNameSplits = StringUtils.split(tableName, ".", 2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the calcite parser not support getting columns names from [table].[column] format? If so, we should just use that, instead of post processing here? We have been adding small string manipulations per query incrementally, and I fear it will add up to have performance significance soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calcite doesn't support parse it as [table].[column], we will get the whole as identifier.
Then we need to handle that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Calcite doesn't support, then should we support it? The logic seems brittle anyways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should. The usage is for other system integrations like tableau

// Update table name if there is no existing table in the format of [database_name].[table_name] but only [table_name]
if (_enableCaseInsensitive) {
if (tableNameSplits.length < 2) {
brokerRequest.getQuerySource().setTableName(_tableCache.getActualTableName(tableName));
return;
}
if (_tableCache.containsTable(tableNameSplits[1]) && !_tableCache.containsTable(tableName)) {
// Use TableCache to check case insensitive table name.
brokerRequest.getQuerySource().setTableName(tableNameSplits[1]);
brokerRequest.getQuerySource().setTableName(_tableCache.getActualTableName(tableNameSplits[1]));
}
return;
}
if (tableNameSplits.length < 2) {
return;
}
// Use RoutingManager to check case sensitive table name.
if (TableNameBuilder.isTableResource(tableName)) {
if (_routingManager.routingExists(tableNameSplits[1]) && !_routingManager.routingExists(tableName)) {
Expand Down Expand Up @@ -667,19 +669,17 @@ private void computeResultsForLiteral(Literal literal, List<String> columnNames,
}

/**
* Fixes the case-insensitive column names to the actual column names in the given broker request.
* Fixes the column names to the actual column names in the given broker request.
*/
private void handleCaseSensitivity(BrokerRequest brokerRequest) {
String inputTableName = brokerRequest.getQuerySource().getTableName();
String actualTableName = _tableCache.getActualTableName(inputTableName);
brokerRequest.getQuerySource().setTableName(actualTableName);
private void updateColumnNames(BrokerRequest brokerRequest) {
String tableName = brokerRequest.getQuerySource().getTableName();
//fix columns
if (brokerRequest.getFilterSubQueryMap() != null) {
Collection<FilterQuery> values = brokerRequest.getFilterSubQueryMap().getFilterQueryMap().values();
for (FilterQuery filterQuery : values) {
if (filterQuery.getNestedFilterQueryIdsSize() == 0) {
String expression = filterQuery.getColumn();
filterQuery.setColumn(fixColumnNameCase(actualTableName, expression));
filterQuery.setColumn(fixColumnName(tableName, expression));
}
}
}
Expand All @@ -688,14 +688,14 @@ private void handleCaseSensitivity(BrokerRequest brokerRequest) {
if (!info.getAggregationType().equalsIgnoreCase(AggregationFunctionType.COUNT.getName())) {
// Always read from backward compatible api in AggregationFunctionUtils.
List<String> arguments = AggregationFunctionUtils.getArguments(info);
arguments.replaceAll(e -> fixColumnNameCase(actualTableName, e));
arguments.replaceAll(e -> fixColumnName(tableName, e));
info.setExpressions(arguments);
}
}
if (brokerRequest.isSetGroupBy()) {
List<String> expressions = brokerRequest.getGroupBy().getExpressions();
for (int i = 0; i < expressions.size(); i++) {
expressions.set(i, fixColumnNameCase(actualTableName, expressions.get(i)));
expressions.set(i, fixColumnName(tableName, expressions.get(i)));
}
}
} else {
Expand All @@ -704,74 +704,94 @@ private void handleCaseSensitivity(BrokerRequest brokerRequest) {
for (int i = 0; i < selectionColumns.size(); i++) {
String expression = selectionColumns.get(i);
if (!expression.equals("*")) {
selectionColumns.set(i, fixColumnNameCase(actualTableName, expression));
selectionColumns.set(i, fixColumnName(tableName, expression));
}
}
}
if (brokerRequest.isSetOrderBy()) {
List<SelectionSort> orderBy = brokerRequest.getOrderBy();
for (SelectionSort selectionSort : orderBy) {
String expression = selectionSort.getColumn();
selectionSort.setColumn(fixColumnNameCase(actualTableName, expression));
selectionSort.setColumn(fixColumnName(tableName, expression));
}
}

PinotQuery pinotQuery = brokerRequest.getPinotQuery();
if (pinotQuery != null) {
pinotQuery.getDataSource().setTableName(actualTableName);
pinotQuery.getDataSource().setTableName(tableName);
for (Expression expression : pinotQuery.getSelectList()) {
fixColumnNameCase(actualTableName, expression);
fixColumnName(tableName, expression);
}
Expression filterExpression = pinotQuery.getFilterExpression();
if (filterExpression != null) {
fixColumnNameCase(actualTableName, filterExpression);
fixColumnName(tableName, filterExpression);
}
List<Expression> groupByList = pinotQuery.getGroupByList();
if (groupByList != null) {
for (Expression expression : groupByList) {
fixColumnNameCase(actualTableName, expression);
fixColumnName(tableName, expression);
}
}
List<Expression> orderByList = pinotQuery.getOrderByList();
if (orderByList != null) {
for (Expression expression : orderByList) {
fixColumnNameCase(actualTableName, expression);
fixColumnName(tableName, expression);
}
}
Expression havingExpression = pinotQuery.getHavingExpression();
if (havingExpression != null) {
fixColumnNameCase(actualTableName, havingExpression);
fixColumnName(tableName, havingExpression);
}
}
}

private String fixColumnNameCase(String tableNameWithType, String expression) {
private String fixColumnName(String tableNameWithType, String expression) {
TransformExpressionTree expressionTree = TransformExpressionTree.compileToExpressionTree(expression);
fixColumnNameCase(tableNameWithType, expressionTree);
fixColumnName(tableNameWithType, expressionTree);
return expressionTree.toString();
}

private void fixColumnNameCase(String tableNameWithType, TransformExpressionTree expression) {
private void fixColumnName(String tableNameWithType, TransformExpressionTree expression) {
TransformExpressionTree.ExpressionType expressionType = expression.getExpressionType();
if (expressionType == TransformExpressionTree.ExpressionType.IDENTIFIER) {
expression.setValue(_tableCache.getActualColumnName(tableNameWithType, expression.getValue()));
String identifier = expression.getValue();
expression.setValue(getActualColumnName(tableNameWithType, identifier));
} else if (expressionType == TransformExpressionTree.ExpressionType.FUNCTION) {
for (TransformExpressionTree child : expression.getChildren()) {
fixColumnNameCase(tableNameWithType, child);
fixColumnName(tableNameWithType, child);
}
}
}

private void fixColumnNameCase(String tableNameWithType, Expression expression) {
private void fixColumnName(String tableNameWithType, Expression expression) {
ExpressionType expressionType = expression.getType();
if (expressionType == ExpressionType.IDENTIFIER) {
Identifier identifier = expression.getIdentifier();
identifier.setName(_tableCache.getActualColumnName(tableNameWithType, identifier.getName()));
identifier.setName(getActualColumnName(tableNameWithType, identifier.getName()));
} else if (expressionType == ExpressionType.FUNCTION) {
for (Expression operand : expression.getFunctionCall().getOperands()) {
fixColumnNameCase(tableNameWithType, operand);
fixColumnName(tableNameWithType, operand);
}
}
}

private String getActualColumnName(String tableNameWithType, String columnName) {
String[] splits = StringUtils.split(columnName, ".", 2);
if (_enableCaseInsensitive) {
if (splits.length == 2) {
if (TableNameBuilder.extractRawTableName(tableNameWithType).equalsIgnoreCase(splits[0])) {
return _tableCache.getActualColumnName(tableNameWithType, splits[1]);
}
}
return _tableCache.getActualColumnName(tableNameWithType, columnName);
} else {
if (splits.length == 2) {
if (TableNameBuilder.extractRawTableName(tableNameWithType).equals(splits[0])) {
return splits[1];
}
}
}
return columnName;
}

private static Map<String, String> getOptionsFromJson(JsonNode request, String optionsKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1159,24 +1159,71 @@ public void testCaseInsensitivity() {
"SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable",
"SELECT COUNT(*) FROM mytable GROUP BY dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')");
List<String> queries = new ArrayList<>();
baseQueries.stream().forEach(q -> queries.add(q.replace("mytable", "MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
baseQueries.stream().forEach(q -> queries.add(q.replace("mytable", "MYDB.MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
baseQueries.forEach(q -> queries.add(q.replace("mytable", "MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));
baseQueries.forEach(q -> queries.add(q.replace("mytable", "MYDB.MYTABLE").replace("DaysSinceEpoch", "DAYSSinceEpOch")));

// Wait for at most 10 seconds for broker to get the ZK callback of the schema change
TestUtils.waitForCondition(aVoid -> {
for (String query : queries) {
try {
for (String query : queries) {
JsonNode response = postQuery(query);
// NOTE: When table does not exist, we will get 'BrokerResourceMissingError'.
// When column does not exist, all segments will be pruned and 'numSegmentsProcessed' will be 0.
return response.get("exceptions").size() == 0 && response.get("numSegmentsProcessed").asInt() > 0;
}
postQuery(query);
} catch (Exception e) {
// Fail the test when exception caught
throw new RuntimeException(e);
throw new RuntimeException("Got Exceptions from query - " + query);
}
}
}

@Test
public void testColumnNameContainsTableName() {
int daysSinceEpoch = 16138;
long secondsSinceEpoch = 16138 * 24 * 60 * 60;
List<String> baseQueries = Arrays.asList("SELECT * FROM mytable",
"SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable",
"SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by DaysSinceEpoch limit 10000",
"SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 10000",
"SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch,
"SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch,
"SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + daysSinceEpoch,
"SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable",
"SELECT COUNT(*) FROM mytable GROUP BY dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')");
List<String> queries = new ArrayList<>();
baseQueries.forEach(q -> queries.add(q.replace("DaysSinceEpoch", "mytable.DAYSSinceEpOch")));
baseQueries.forEach(q -> queries.add(q.replace("DaysSinceEpoch", "mytable.DAYSSinceEpOch")));

for (String query : queries) {
try {
postQuery(query);
} catch (Exception e) {
// Fail the test when exception caught
throw new RuntimeException("Got Exceptions from query - " + query);
}
return true;
}, 10_000L, "Failed to get results for case-insensitive queries");
}
}

@Test
public void testCaseInsensitivityWithColumnNameContainsTableName() {
int daysSinceEpoch = 16138;
long secondsSinceEpoch = 16138 * 24 * 60 * 60;
List<String> baseQueries = Arrays.asList("SELECT * FROM mytable",
"SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable",
"SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by DaysSinceEpoch limit 10000",
"SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 10000",
"SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch,
"SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch,
"SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + daysSinceEpoch,
"SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable",
"SELECT COUNT(*) FROM mytable GROUP BY dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')");
List<String> queries = new ArrayList<>();
baseQueries.forEach(q -> queries.add(q.replace("mytable", "MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch")));
baseQueries.forEach(q -> queries.add(q.replace("mytable", "MYDB.MYTABLE").replace("DaysSinceEpoch", "MYTABLE.DAYSSinceEpOch")));

for (String query : queries) {
try {
postQuery(query);
} catch (Exception e) {
// Fail the test when exception caught
throw new RuntimeException("Got Exceptions from query - " + query);
}
}
}

@Test
Expand Down