Skip to content

Commit

Permalink
Adding DML definition and parse SQL InsertFile (apache#8557)
Browse files Browse the repository at this point in the history
* Adding DML definition and parse sql

* Use MinionClient for execute async task

* Add integration tests
  • Loading branch information
xiangfu0 authored and chengxuan.wang committed May 5, 2022
1 parent d282e45 commit 86dcc7d
Show file tree
Hide file tree
Showing 20 changed files with 712 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.HashMap;
import java.util.Map;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
Expand All @@ -42,11 +44,17 @@
import org.apache.pinot.broker.api.HttpRequesterIdentity;
import org.apache.pinot.broker.api.RequestStatistics;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.sql.parsers.PinotSqlType;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.glassfish.jersey.server.ManagedAsync;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -57,6 +65,9 @@
public class PinotClientRequest {
private static final Logger LOGGER = LoggerFactory.getLogger(PinotClientRequest.class);

@Inject
SqlQueryExecutor _sqlQueryExecutor;

@Inject
private BrokerRequestHandler _requestHandler;

Expand Down Expand Up @@ -158,8 +169,7 @@ public void processSqlQueryGet(@ApiParam(value = "Query", required = true) @Quer
if (debugOptions != null) {
requestJson.put(Request.DEBUG_OPTIONS, debugOptions);
}
BrokerResponse brokerResponse =
_requestHandler.handleRequest(requestJson, makeHttpIdentity(requestContext), new RequestStatistics());
BrokerResponse brokerResponse = executeSqlQuery(requestJson, makeHttpIdentity(requestContext), true);
asyncResponse.resume(brokerResponse.toJsonString());
} catch (Exception e) {
LOGGER.error("Caught exception while processing GET request", e);
Expand Down Expand Up @@ -187,8 +197,7 @@ public void processSqlQueryPost(String query, @Suspended AsyncResponse asyncResp
String queryOptions = constructSqlQueryOptions();
// the only query options as of now are sql related. do not allow any custom query options in sql endpoint
ObjectNode sqlRequestJson = ((ObjectNode) requestJson).put(Request.QUERY_OPTIONS, queryOptions);
BrokerResponse brokerResponse =
_requestHandler.handleRequest(sqlRequestJson, makeHttpIdentity(requestContext), new RequestStatistics());
BrokerResponse brokerResponse = executeSqlQuery(sqlRequestJson, makeHttpIdentity(requestContext), false);
asyncResponse.resume(brokerResponse.toJsonString());
} catch (Exception e) {
LOGGER.error("Caught exception while processing POST request", e);
Expand All @@ -197,6 +206,34 @@ public void processSqlQueryPost(String query, @Suspended AsyncResponse asyncResp
}
}

private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterIdentity httpRequesterIdentity,
boolean onlyDql)
throws Exception {
SqlNodeAndOptions sqlNodeAndOptions;
try {
sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(sqlRequestJson.get(Request.SQL).asText());
} catch (Exception e) {
return new BrokerResponseNative(QueryException.getException(QueryException.PQL_PARSING_ERROR, e));
}
PinotSqlType sqlType = CalciteSqlParser.extractSqlType(sqlNodeAndOptions.getSqlNode());
if (onlyDql && sqlType != PinotSqlType.DQL) {
return new BrokerResponseNative(QueryException.getException(QueryException.PQL_PARSING_ERROR,
new UnsupportedOperationException("Unsupported SQL type - " + sqlType + ", GET API only supports DQL.")));
}
switch (sqlType) {
case DQL:
return _requestHandler.handleRequest(sqlRequestJson, httpRequesterIdentity, new RequestStatistics());
case DML:
Map<String, String> headers = new HashMap<>();
httpRequesterIdentity.getHttpHeaders().entries()
.forEach(entry -> headers.put(entry.getKey(), entry.getValue()));
return _sqlQueryExecutor.executeDMLStatement(sqlNodeAndOptions, headers);
default:
return new BrokerResponseNative(QueryException.getException(QueryException.PQL_PARSING_ERROR,
new UnsupportedOperationException("Unsupported SQL type - " + sqlType)));
}
}

private String constructSqlQueryOptions() {
return Request.QueryOptionKey.GROUP_BY_MODE + "=" + Request.SQL + ";" + Request.QueryOptionKey.RESPONSE_FORMAT + "="
+ Request.SQL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.core.api.ServiceAutoDiscoveryFeature;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.spi.env.PinotConfiguration;
Expand All @@ -48,7 +49,7 @@ public class BrokerAdminApiApplication extends ResourceConfig {
private HttpServer _httpServer;

public BrokerAdminApiApplication(BrokerRoutingManager routingManager, BrokerRequestHandler brokerRequestHandler,
BrokerMetrics brokerMetrics, PinotConfiguration brokerConf) {
BrokerMetrics brokerMetrics, PinotConfiguration brokerConf, SqlQueryExecutor sqlQueryExecutor) {
packages(RESOURCE_PACKAGE);
property(PINOT_CONFIGURATION, brokerConf);
if (brokerConf.getProperty(CommonConstants.Broker.BROKER_SERVICE_AUTO_DISCOVERY, false)) {
Expand All @@ -57,6 +58,7 @@ public BrokerAdminApiApplication(BrokerRoutingManager routingManager, BrokerRequ
register(new AbstractBinder() {
@Override
protected void configure() {
bind(sqlQueryExecutor).to(SqlQueryExecutor.class);
bind(routingManager).to(BrokerRoutingManager.class);
bind(brokerRequestHandler).to(BrokerRequestHandler.class);
bind(brokerMetrics).to(BrokerMetrics.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.pinot.common.utils.TlsUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.core.transport.ListenerConfig;
import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.spi.env.PinotConfiguration;
Expand Down Expand Up @@ -103,6 +104,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
protected BrokerRoutingManager _routingManager;
protected AccessControlFactory _accessControlFactory;
protected BrokerRequestHandler _brokerRequestHandler;
protected SqlQueryExecutor _sqlQueryExecutor;
protected BrokerAdminApiApplication _brokerAdminApplication;
protected ClusterChangeMediator _clusterChangeMediator;
// Participant Helix manager handles Helix functionality such as state transitions and messages
Expand Down Expand Up @@ -266,10 +268,16 @@ public void start()
}
}
_brokerRequestHandler.start();

String controllerUrl = _brokerConf.getProperty(Broker.CONTROLLER_URL);
if (controllerUrl != null) {
_sqlQueryExecutor = new SqlQueryExecutor(controllerUrl);
} else {
_sqlQueryExecutor = new SqlQueryExecutor(_spectatorHelixManager);
}
LOGGER.info("Starting broker admin application on: {}", ListenerConfigUtil.toString(_listenerConfigs));
_brokerAdminApplication =
new BrokerAdminApiApplication(_routingManager, _brokerRequestHandler, _brokerMetrics, _brokerConf);
new BrokerAdminApiApplication(_routingManager, _brokerRequestHandler, _brokerMetrics, _brokerConf,
_sqlQueryExecutor);
_brokerAdminApplication.start(_listenerConfigs);

LOGGER.info("Initializing cluster change mediator");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.annotations.InterfaceStability;
import org.apache.pinot.spi.config.task.AdhocTaskConfig;
import org.apache.pinot.spi.utils.JsonUtils;


Expand Down Expand Up @@ -109,6 +111,25 @@ public String getTaskState(String taskName)
return responseString;
}

public Map<String, String> executeTask(AdhocTaskConfig adhocTaskConfig, @Nullable Map<String, String> headers)
throws IOException {
HttpPost httpPost = createHttpPostRequest(MinionRequestURLBuilder.baseUrl(getControllerUrl()).forTaskExecute());
httpPost.setEntity(new StringEntity(adhocTaskConfig.toJsonString()));
if (headers != null) {
headers.remove("content-length");
headers.entrySet().forEach(entry -> httpPost.setHeader(entry.getKey(), entry.getValue()));
}
HttpResponse response = HTTP_CLIENT.execute(httpPost);
int statusCode = response.getStatusLine().getStatusCode();
final String responseString = IOUtils.toString(response.getEntity().getContent());
if (statusCode >= 400) {
throw new HttpException(String
.format("Unable to get tasks states map. Error code %d, Error message: %s", statusCode, responseString));
}
return JsonUtils.stringToObject(responseString, new TypeReference<Map<String, String>>() {
});
}

private HttpGet createHttpGetRequest(String uri) {
HttpGet httpGet = new HttpGet(uri);
httpGet.setHeader(ACCEPT, APPLICATION_JSON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,8 @@ public String forTaskTypeResume(String taskType) {
public String forTaskTypeDelete(String taskType) {
return StringUtil.join("/", _baseUrl, "tasks", taskType);
}

public String forTaskExecute() {
return StringUtil.join("/", _baseUrl, "tasks/execute");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.pinot.pql.parsers.pql2.ast.FilterKind;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.utils.Pairs;
import org.apache.pinot.sql.parsers.parser.SqlInsertFromFile;
import org.apache.pinot.sql.parsers.parser.SqlParserImpl;
import org.apache.pinot.sql.parsers.rewriter.QueryRewriter;
import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
Expand Down Expand Up @@ -107,6 +108,40 @@ private static String removeTerminatingSemicolon(String sql) {
return sql;
}

public static SqlNodeAndOptions compileToSqlNodeAndOptions(String sql)
throws Exception {
// Remove the comments from the query
sql = removeComments(sql);

// Remove the terminating semicolon from the query
sql = removeTerminatingSemicolon(sql);

// Extract OPTION statements from sql as Calcite Parser doesn't parse it.
List<String> options = extractOptionsFromSql(sql);
if (!options.isEmpty()) {
sql = removeOptionsFromSql(sql);
}

try (StringReader inStream = new StringReader(sql)) {
SqlParserImpl sqlParser = newSqlParser(inStream);
return new SqlNodeAndOptions(sqlParser.parseSqlStmtEof(), options);
} catch (Throwable e) {
throw new SqlCompilationException("Caught exception while parsing query: " + sql, e);
}
}

public static PinotSqlType extractSqlType(SqlNode sqlNode) {
switch (sqlNode.getKind()) {
case OTHER_DDL:
if (sqlNode instanceof SqlInsertFromFile) {
return PinotSqlType.DML;
}
throw new SqlCompilationException("Unsupported SqlNode type - " + sqlNode.getKind());
default:
return PinotSqlType.DQL;
}
}

public static PinotQuery compileToPinotQuery(String sql)
throws SqlCompilationException {
// Remove the comments from the query
Expand Down Expand Up @@ -324,10 +359,7 @@ static SqlParserImpl newSqlParser(StringReader inStream) {
return sqlParser;
}

private static void setOptions(PinotQuery pinotQuery, List<String> optionsStatements) {
if (optionsStatements.isEmpty()) {
return;
}
public static Map<String, String> extractOptionsMap(List<String> optionsStatements) {
Map<String, String> options = new HashMap<>();
for (String optionsStatement : optionsStatements) {
for (String option : optionsStatement.split(",")) {
Expand All @@ -338,10 +370,17 @@ private static void setOptions(PinotQuery pinotQuery, List<String> optionsStatem
options.put(splits[0].trim(), splits[1].trim());
}
}
pinotQuery.setQueryOptions(options);
return options;
}

private static void setOptions(PinotQuery pinotQuery, List<String> optionsStatements) {
if (optionsStatements.isEmpty()) {
return;
}
pinotQuery.setQueryOptions(extractOptionsMap(optionsStatements));
}

private static PinotQuery compileSqlNodeToPinotQuery(SqlNode sqlNode) {
public static PinotQuery compileSqlNodeToPinotQuery(SqlNode sqlNode) {
PinotQuery pinotQuery = new PinotQuery();
if (sqlNode instanceof SqlExplain) {
// Extract sql node for the query
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.sql.parsers;

public enum PinotSqlType {
/* Data Query Language (DQL), e.g. SELECT */
DQL,
/* Data Control Language(DCL), e.g. GRANT, REVOKE */
DCL,
/* Data Manipulation Language (DML), e.g. INSERT, UPSERT, UPDATE, DELETE */
DML,
/* Data Definition Language (DDL), e.g. CREATE, DROP, ALTER, TRUNCATE */
DDL
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.sql.parsers;

import java.util.List;
import org.apache.calcite.sql.SqlNode;


public class SqlNodeAndOptions {
private final SqlNode _sqlNode;
private final List<String> _options;

public SqlNodeAndOptions(SqlNode sqlNode, List<String> options) {
_sqlNode = sqlNode;
_options = options;
}

public SqlNode getSqlNode() {
return _sqlNode;
}

public List<String> getOptions() {
return _options;
}
}
Loading

0 comments on commit 86dcc7d

Please sign in to comment.