Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiKeyAuthDefinition;
Expand Down Expand Up @@ -169,6 +170,70 @@ public void processSqlQueryPost(String query, @Suspended AsyncResponse asyncResp
}
}

@GET
@ManagedAsync
@Produces(MediaType.APPLICATION_JSON)
@Path("query")
@ApiOperation(value = "Querying pinot using MultiStage Query Engine")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Query response"),
@ApiResponse(code = 500, message = "Internal Server Error")
})
@ManualAuthorization
public void processSqlWithMultiStageQueryEngineGet(
@ApiParam(value = "Query", required = true) @QueryParam("sql") String query,
@Suspended AsyncResponse asyncResponse, @Context org.glassfish.grizzly.http.server.Request requestContext,
@Context HttpHeaders httpHeaders) {
try {
ObjectNode requestJson = JsonUtils.newObjectNode();
requestJson.put(Request.SQL, query);
BrokerResponse brokerResponse =
executeSqlQuery(requestJson, makeHttpIdentity(requestContext), true, httpHeaders, true);
asyncResponse.resume(brokerResponse.toJsonString());
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
LOGGER.error("Caught exception while processing GET request", e);
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_GET_EXCEPTIONS, 1L);
asyncResponse.resume(new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR));
}
}

@POST
@ManagedAsync
@Produces(MediaType.APPLICATION_JSON)
@Path("query")
@ApiOperation(value = "Querying pinot using MultiStage Query Engine")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Query response"),
@ApiResponse(code = 500, message = "Internal Server Error")
})
@ManualAuthorization
public void processSqlWithMultiStageQueryEnginePost(String query, @Suspended AsyncResponse asyncResponse,
@Context org.glassfish.grizzly.http.server.Request requestContext,
@Context HttpHeaders httpHeaders) {
try {
JsonNode requestJson = JsonUtils.stringToJsonNode(query);
if (!requestJson.has(Request.SQL)) {
throw new IllegalStateException("Payload is missing the query string field 'sql'");
}
BrokerResponse brokerResponse =
executeSqlQuery((ObjectNode) requestJson, makeHttpIdentity(requestContext), false, httpHeaders, true);
asyncResponse.resume(brokerResponse.toJsonString());
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
LOGGER.error("Caught exception while processing POST request", e);
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_POST_EXCEPTIONS, 1L);
asyncResponse.resume(
new WebApplicationException(e,
Response
.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity(e.getMessage())
.build()));
}
}

@DELETE
@Path("query/{queryId}")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY)
Expand All @@ -185,7 +250,7 @@ public String cancelQuery(
@ApiParam(value = "Timeout for servers to respond the cancel request") @QueryParam("timeoutMs")
@DefaultValue("3000") int timeoutMs,
@ApiParam(value = "Return server responses for troubleshooting") @QueryParam("verbose") @DefaultValue("false")
boolean verbose) {
boolean verbose) {
try {
Map<String, Integer> serverResponses = verbose ? new HashMap<>() : null;
if (_requestHandler.cancelQuery(queryId, timeoutMs, _executor, _httpConnMgr, serverResponses)) {
Expand Down Expand Up @@ -226,12 +291,21 @@ public Map<Long, String> getRunningQueries() {
private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterIdentity httpRequesterIdentity,
boolean onlyDql, HttpHeaders httpHeaders)
throws Exception {
return executeSqlQuery(sqlRequestJson, httpRequesterIdentity, onlyDql, httpHeaders, false);
}

private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterIdentity httpRequesterIdentity,
boolean onlyDql, HttpHeaders httpHeaders, boolean forceUseMultiStage)
throws Exception {
SqlNodeAndOptions sqlNodeAndOptions;
try {
sqlNodeAndOptions = RequestUtils.parseQuery(sqlRequestJson.get(Request.SQL).asText(), sqlRequestJson);
} catch (Exception e) {
return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e));
}
if (forceUseMultiStage) {
sqlNodeAndOptions.setExtraOptions(ImmutableMap.of(Request.QueryOptionKey.USE_MULTISTAGE_ENGINE, "true"));
}
PinotSqlType sqlType = sqlNodeAndOptions.getSqlType();
if (onlyDql && sqlType != PinotSqlType.DQL) {
return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,7 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
}

boolean traceEnabled = Boolean.parseBoolean(
request.has(CommonConstants.Broker.Request.TRACE) ? request.get(CommonConstants.Broker.Request.TRACE).asText()
: "false");
sqlNodeAndOptions.getOptions().getOrDefault(CommonConstants.Broker.Request.TRACE, "false"));

ResultTable queryResults;
Map<Integer, ExecutionStatsAggregator> stageIdStatsMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.JdkSslContext;
import io.netty.handler.ssl.SslContext;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
Expand Down Expand Up @@ -61,22 +60,25 @@ public class JsonAsyncHttpPinotClientTransport implements PinotClientTransport<C
private final int _brokerReadTimeout;
private final AsyncHttpClient _httpClient;
private final String _extraOptionStr;
private final boolean _useMultiStageEngine;

public JsonAsyncHttpPinotClientTransport() {
_brokerReadTimeout = 60000;
_headers = new HashMap<>();
_scheme = CommonConstants.HTTP_PROTOCOL;
_extraOptionStr = DEFAULT_EXTRA_QUERY_OPTION_STRING;
_httpClient = Dsl.asyncHttpClient(Dsl.config().setRequestTimeout(_brokerReadTimeout));
_useMultiStageEngine = false;
}

public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String scheme, String extraOptionString,
@Nullable SSLContext sslContext, ConnectionTimeouts connectionTimeouts, TlsProtocols tlsProtocols,
@Nullable String appId) {
boolean useMultiStageEngine, @Nullable SSLContext sslContext, ConnectionTimeouts connectionTimeouts,
TlsProtocols tlsProtocols, @Nullable String appId) {
_brokerReadTimeout = connectionTimeouts.getReadTimeoutMs();
_headers = headers;
_scheme = scheme;
_extraOptionStr = StringUtils.isEmpty(extraOptionString) ? DEFAULT_EXTRA_QUERY_OPTION_STRING : extraOptionString;
_useMultiStageEngine = useMultiStageEngine;

Builder builder = Dsl.config();
if (sslContext != null) {
Expand All @@ -92,28 +94,6 @@ public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String sch
_httpClient = Dsl.asyncHttpClient(builder.build());
}

public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String scheme, String extraOptionStr,
@Nullable SslContext sslContext, ConnectionTimeouts connectionTimeouts, TlsProtocols tlsProtocols,
@Nullable String appId) {
_brokerReadTimeout = connectionTimeouts.getReadTimeoutMs();
_headers = headers;
_scheme = scheme;
_extraOptionStr = StringUtils.isEmpty(extraOptionStr) ? DEFAULT_EXTRA_QUERY_OPTION_STRING : extraOptionStr;

Builder builder = Dsl.config();
if (sslContext != null) {
builder.setSslContext(sslContext);
}

builder.setRequestTimeout(_brokerReadTimeout)
.setReadTimeout(connectionTimeouts.getReadTimeoutMs())
.setConnectTimeout(connectionTimeouts.getConnectTimeoutMs())
.setHandshakeTimeout(connectionTimeouts.getHandshakeTimeoutMs())
.setUserAgent(ConnectionUtils.getUserAgentVersionFromClassPath("ua", appId))
.setEnabledProtocols(tlsProtocols.getEnabledProtocols().toArray(new String[0]));
_httpClient = Dsl.asyncHttpClient(builder.build());
}

@Override
public BrokerResponse executeQuery(String brokerAddress, String query)
throws PinotClientException {
Expand All @@ -131,29 +111,29 @@ public CompletableFuture<BrokerResponse> executeQueryAsync(String brokerAddress,
json.put("sql", query);
json.put("queryOptions", _extraOptionStr);

String url = _scheme + "://" + brokerAddress + "/query/sql";
String url = String.format("%s://%s%s", _scheme, brokerAddress, _useMultiStageEngine ? "/query" : "/query/sql");
BoundRequestBuilder requestBuilder = _httpClient.preparePost(url);

if (_headers != null) {
_headers.forEach((k, v) -> requestBuilder.addHeader(k, v));
}
LOGGER.debug("Sending query {} to {}", query, url);
return requestBuilder.addHeader("Content-Type", "application/json; charset=utf-8").setBody(json.toString())
.execute().toCompletableFuture().thenApply(httpResponse -> {
LOGGER.debug("Completed query, HTTP status is {}", httpResponse.getStatusCode());
.execute().toCompletableFuture().thenApply(httpResponse -> {
LOGGER.debug("Completed query, HTTP status is {}", httpResponse.getStatusCode());

if (httpResponse.getStatusCode() != 200) {
throw new PinotClientException(
if (httpResponse.getStatusCode() != 200) {
throw new PinotClientException(
"Pinot returned HTTP status " + httpResponse.getStatusCode() + ", expected 200");
}

String responseBody = httpResponse.getResponseBody(StandardCharsets.UTF_8);
try {
return BrokerResponse.fromJson(OBJECT_READER.readTree(responseBody));
} catch (JsonProcessingException e) {
throw new CompletionException(e);
}
});
}

String responseBody = httpResponse.getResponseBody(StandardCharsets.UTF_8);
try {
return BrokerResponse.fromJson(OBJECT_READER.readTree(responseBody));
} catch (JsonProcessingException e) {
throw new CompletionException(e);
}
});
} catch (Exception e) {
throw new PinotClientException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@ public class JsonAsyncHttpPinotClientTransportFactory implements PinotClientTran
private int _handshakeTimeoutMs = Integer.parseInt(DEFAULT_BROKER_HANDSHAKE_TIMEOUT_MS);
private String _appId = null;
private String _extraOptionString;
private boolean _useMultiStageEngine;

@Override
public PinotClientTransport buildTransport() {
ConnectionTimeouts connectionTimeouts =
ConnectionTimeouts.create(_readTimeoutMs, _connectTimeoutMs, _handshakeTimeoutMs);
TlsProtocols tlsProtocols = TlsProtocols.defaultProtocols(_tlsV10Enabled);
return new JsonAsyncHttpPinotClientTransport(_headers, _scheme, _extraOptionString, _sslContext, connectionTimeouts,
tlsProtocols, _appId);
return new JsonAsyncHttpPinotClientTransport(_headers, _scheme, _extraOptionString, _useMultiStageEngine,
_sslContext, connectionTimeouts, tlsProtocols, _appId);
}

public Map<String, String> getHeaders() {
Expand Down Expand Up @@ -103,6 +104,7 @@ public JsonAsyncHttpPinotClientTransportFactory withConnectionProperties(Propert
System.getProperties().getProperty("broker.tlsV10Enabled", DEFAULT_BROKER_TLS_V10_ENABLED));

_extraOptionString = properties.getProperty("queryOptions", "");
_useMultiStageEngine = Boolean.parseBoolean(properties.getProperty("useMultiStageEngine", "false"));
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
protected List<StreamDataServerStartable> _kafkaStarters;

protected org.apache.pinot.client.Connection _pinotConnection;
protected org.apache.pinot.client.Connection _pinotConnectionV2;
protected Connection _h2Connection;
protected QueryGenerator _queryGenerator;

Expand Down Expand Up @@ -506,6 +507,14 @@ protected TableConfig getRealtimeTableConfig() {
* @return Pinot connection
*/
protected org.apache.pinot.client.Connection getPinotConnection() {
if (useMultiStageQueryEngine()) {
if (_pinotConnectionV2 == null) {
Properties properties = getPinotConnectionProperties();
properties.put("useMultiStageEngine", "true");
_pinotConnectionV2 = ConnectionFactory.fromZookeeper(properties, getZkUrl() + "/" + getHelixClusterName());
}
return _pinotConnectionV2;
}
if (_pinotConnection == null) {
_pinotConnection =
ConnectionFactory.fromZookeeper(getPinotConnectionProperties(), getZkUrl() + "/" + getHelixClusterName());
Expand Down Expand Up @@ -753,7 +762,7 @@ protected void testQuery(String query)
protected void testQuery(String pinotQuery, String h2Query)
throws Exception {
ClusterIntegrationTestUtils.testQuery(pinotQuery, getBrokerBaseApiUrl(), getPinotConnection(), h2Query,
getH2Connection(), null, getExtraQueryProperties());
getH2Connection(), null, getExtraQueryProperties(), useMultiStageQueryEngine());
}

/**
Expand All @@ -762,6 +771,6 @@ protected void testQuery(String pinotQuery, String h2Query)
protected void testQueryWithMatchingRowCount(String pinotQuery, String h2Query)
throws Exception {
ClusterIntegrationTestUtils.testQueryWithMatchingRowCount(pinotQuery, getBrokerBaseApiUrl(), getPinotConnection(),
h2Query, getH2Connection(), null, getExtraQueryProperties());
h2Query, getH2Connection(), null, getExtraQueryProperties(), useMultiStageQueryEngine());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ public static void pushCsvIntoKafka(List<String> csvRecords, String kafkaTopic,
}
}
CSVFormat csvFormat = CSVFormat.DEFAULT.withSkipHeaderRecord(true);
for (String recordCsv: csvRecords) {
for (String recordCsv : csvRecords) {
try (CSVParser parser = CSVParser.parse(recordCsv, csvFormat)) {
for (CSVRecord csv : parser) {
byte[] keyBytes = (partitionColumnIndex == null) ? Longs.toByteArray(counter++)
Expand Down Expand Up @@ -650,7 +650,7 @@ static void testQuery(String pinotQuery, String queryResourceUrl, org.apache.pin
static void testQuery(String pinotQuery, String queryResourceUrl, org.apache.pinot.client.Connection pinotConnection,
String h2Query, Connection h2Connection, @Nullable Map<String, String> headers)
throws Exception {
testQuery(pinotQuery, queryResourceUrl, pinotConnection, h2Query, h2Connection, headers, null);
testQuery(pinotQuery, queryResourceUrl, pinotConnection, h2Query, h2Connection, headers, null, false);
}

/**
Expand All @@ -659,33 +659,35 @@ static void testQuery(String pinotQuery, String queryResourceUrl, org.apache.pin
*/
static void testQueryWithMatchingRowCount(String pinotQuery, String queryResourceUrl,
org.apache.pinot.client.Connection pinotConnection, String h2Query, Connection h2Connection,
@Nullable Map<String, String> headers, @Nullable Map<String, String> extraJsonProperties)
@Nullable Map<String, String> headers, @Nullable Map<String, String> extraJsonProperties,
boolean useMultiStageQueryEngine)
throws Exception {
try {
testQueryInternal(pinotQuery, queryResourceUrl, pinotConnection, h2Query, h2Connection, headers,
extraJsonProperties, true, false);
extraJsonProperties, useMultiStageQueryEngine, true, false);
} catch (Exception e) {
failure(pinotQuery, h2Query, e);
}
}

static void testQuery(String pinotQuery, String queryResourceUrl, org.apache.pinot.client.Connection pinotConnection,
String h2Query, Connection h2Connection, @Nullable Map<String, String> headers,
@Nullable Map<String, String> extraJsonProperties) {
@Nullable Map<String, String> extraJsonProperties, boolean useMultiStageQueryEngine) {
try {
testQueryInternal(pinotQuery, queryResourceUrl, pinotConnection, h2Query, h2Connection, headers,
extraJsonProperties, false, false);
extraJsonProperties, useMultiStageQueryEngine, false, false);
} catch (Exception e) {
failure(pinotQuery, h2Query, e);
}
}

static void testQueryViaController(String pinotQuery, String queryResourceUrl,
org.apache.pinot.client.Connection pinotConnection, String h2Query, Connection h2Connection,
@Nullable Map<String, String> headers, @Nullable Map<String, String> extraJsonProperties) {
@Nullable Map<String, String> headers, @Nullable Map<String, String> extraJsonProperties,
boolean useMultiStageQueryEngine) {
try {
testQueryInternal(pinotQuery, queryResourceUrl, pinotConnection, h2Query, h2Connection, headers,
extraJsonProperties, false, true);
extraJsonProperties, useMultiStageQueryEngine, false, true);
} catch (Exception e) {
failure(pinotQuery, h2Query, e);
}
Expand All @@ -694,14 +696,16 @@ static void testQueryViaController(String pinotQuery, String queryResourceUrl,
private static void testQueryInternal(String pinotQuery, String queryResourceUrl,
org.apache.pinot.client.Connection pinotConnection, String h2Query, Connection h2Connection,
@Nullable Map<String, String> headers, @Nullable Map<String, String> extraJsonProperties,
boolean matchingRowCount, boolean viaController)
boolean useMultiStageQueryEngine, boolean matchingRowCount, boolean viaController)
throws Exception {
// broker response
JsonNode pinotResponse;
if (viaController) {
pinotResponse = ClusterTest.postQueryToController(pinotQuery, queryResourceUrl, headers, extraJsonProperties);
} else {
pinotResponse = ClusterTest.postQuery(pinotQuery, queryResourceUrl, headers, extraJsonProperties);
pinotResponse =
ClusterTest.postQuery(pinotQuery, getBrokerQueryApiUrl(queryResourceUrl, useMultiStageQueryEngine), headers,
extraJsonProperties);
}
if (!pinotResponse.get("exceptions").isEmpty()) {
throw new RuntimeException("Got Exceptions from Query Response: " + pinotResponse);
Expand Down Expand Up @@ -824,10 +828,15 @@ private static String getExplainPlan(String pinotQuery, String brokerUrl, @Nulla
@Nullable Map<String, String> extraJsonProperties)
throws Exception {
JsonNode explainPlanForResponse =
ClusterTest.postQuery("explain plan for " + pinotQuery, brokerUrl, headers, extraJsonProperties);
ClusterTest.postQuery("explain plan for " + pinotQuery, getBrokerQueryApiUrl(brokerUrl, false), headers,
extraJsonProperties);
return ExplainPlanUtils.formatExplainPlan(explainPlanForResponse);
}

public static String getBrokerQueryApiUrl(String brokerBaseApiUrl, boolean useMultiStageQueryEngine) {
return useMultiStageQueryEngine ? brokerBaseApiUrl + "/query" : brokerBaseApiUrl + "/query/sql";
}

private static int getH2ExpectedValues(Set<String> expectedValues, List<String> expectedOrderByValues,
ResultSet h2ResultSet, ResultSetMetaData h2MetaData, Collection<String> orderByColumns)
throws SQLException {
Expand Down Expand Up @@ -1021,6 +1030,7 @@ private static void failure(String pinotQuery, String h2Query, @Nullable Excepti
String failureMessage = "Caught exception while testing query!";
failure(pinotQuery, h2Query, failureMessage, e);
}

/**
* Helper method to report failures.
*
Expand Down
Loading