Skip to content

Commit

Permalink
Add broker API to run a query on both query engines and compare results
Browse files Browse the repository at this point in the history
  • Loading branch information
yashmayya committed Aug 22, 2024
1 parent 6d64650 commit 0e4a24a
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.inject.Inject;
import javax.ws.rs.DELETE;
Expand Down Expand Up @@ -63,12 +66,16 @@
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.ManualAuthorization;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
import org.apache.pinot.spi.trace.RequestScope;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
Expand Down Expand Up @@ -236,6 +243,67 @@ public void processSqlWithMultiStageQueryEnginePost(String query, @Suspended Asy
}
}

@POST
@Produces(MediaType.APPLICATION_JSON)
@Path("query/compare")
@ApiOperation(value = "Query Pinot using both the single stage query engine and the multi stage query engine and "
+ "compare the results")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Query result comparison response"),
@ApiResponse(code = 500, message = "Internal Server Error")
})
@ManualAuthorization
public void processSqlQueryWithBothEnginesAndCompareResults(String query, @Suspended AsyncResponse asyncResponse,
@Context org.glassfish.grizzly.http.server.Request requestContext,
@Context HttpHeaders httpHeaders) {
try {
JsonNode requestJson = JsonUtils.stringToJsonNode(query);
// TODO: Support inputting different v1 and v2 SQL queries
if (!requestJson.has(Request.SQL)) {
throw new IllegalStateException("Payload is missing the query string field 'sql'");
}

CompletableFuture<BrokerResponse> v1Response = CompletableFuture.supplyAsync(
() -> {
try {
return executeSqlQuery((ObjectNode) requestJson, makeHttpIdentity(requestContext),
true, httpHeaders, false);
} catch (Exception e) {
throw new RuntimeException(e);
}
},
_executor
);

CompletableFuture<BrokerResponse> v2Response = CompletableFuture.supplyAsync(
() -> {
try {
return executeSqlQuery((ObjectNode) requestJson, makeHttpIdentity(requestContext),
true, httpHeaders, true);
} catch (Exception e) {
throw new RuntimeException(e);
}
},
_executor
);

CompletableFuture.allOf(v1Response, v2Response).join();

asyncResponse.resume(getPinotQueryComparisonResponse(requestJson.get(Request.SQL).asText(), v1Response.get(),
v2Response.get()));
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
LOGGER.error("Caught exception while processing request", e);
asyncResponse.resume(
new WebApplicationException(e,
Response
.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity(e.getMessage())
.build()));
}
}

@DELETE
@Path("query/{queryId}")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY)
Expand Down Expand Up @@ -312,7 +380,7 @@ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterI
PinotSqlType sqlType = sqlNodeAndOptions.getSqlType();
if (onlyDql && sqlType != PinotSqlType.DQL) {
return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
new UnsupportedOperationException("Unsupported SQL type - " + sqlType + ", GET API only supports DQL.")));
new UnsupportedOperationException("Unsupported SQL type - " + sqlType + ", this API only supports DQL.")));
}
switch (sqlType) {
case DQL:
Expand Down Expand Up @@ -379,4 +447,70 @@ static Response getPinotQueryResponse(BrokerResponse brokerResponse)
.entity((StreamingOutput) brokerResponse::toOutputStream).type(MediaType.APPLICATION_JSON)
.build();
}

static Response getPinotQueryComparisonResponse(String query, BrokerResponse v1Response, BrokerResponse v2Response) {
ObjectNode response = JsonUtils.newObjectNode();
response.set("v1Response", JsonUtils.objectToJsonNode(v1Response));
response.set("v2Response", JsonUtils.objectToJsonNode(v2Response));
response.set("comparisonAnalysis", JsonUtils.objectToJsonNode(analyzeDifferences(query, v1Response, v2Response)));

return Response.ok()
.header(PINOT_QUERY_ERROR_CODE_HEADER, -1)
.entity(response).type(MediaType.APPLICATION_JSON)
.build();
}

private static List<String> analyzeDifferences(String query, BrokerResponse v1Response, BrokerResponse v2Response) {
List<String> differences = new ArrayList<>();

if (v1Response.getExceptionsSize() != 0 || v2Response.getExceptionsSize() != 0) {
differences.add("Exception encountered while running the query on one or both query engines");
return differences;
}

DataSchema.ColumnDataType[] v1ResponseTypes = v1Response.getResultTable().getDataSchema().getColumnDataTypes();
DataSchema.ColumnDataType[] v2ResponseTypes = v2Response.getResultTable().getDataSchema().getColumnDataTypes();

if (v1ResponseTypes.length != v2ResponseTypes.length) {
differences.add("Mismatch in number of columns returned. v1: " + v1ResponseTypes.length
+ ", v2: " + v2ResponseTypes.length);
return differences;
}

String[] v1ColumnNames = v1Response.getResultTable().getDataSchema().getColumnNames();
String[] v2ColumnNames = v2Response.getResultTable().getDataSchema().getColumnNames();
for (int i = 0; i < v1ResponseTypes.length; i++) {
if (v1ResponseTypes[i] != v2ResponseTypes[i]) {
String columnName = v1ColumnNames[i].equals(v2ColumnNames[i])
? v1ColumnNames[i]
: v1ColumnNames[i] + " / " + v2ColumnNames[i];
differences.add("Mismatch in column data type for column with name " + columnName
+ ". v1 type: " + v1ResponseTypes[i] + ", v2 type: " + v2ResponseTypes[i]);
}
}

if (v1Response.getNumRowsResultSet() != v2Response.getNumRowsResultSet()) {
differences.add("Mismatch in number of rows returned. v1: " + v1Response.getNumRowsResultSet()
+ ", v2: " + v2Response.getNumRowsResultSet());
return differences;
}

QueryContext queryContext = QueryContextConverterUtils.getQueryContext(query);
if (QueryContextUtils.isAggregationQuery(queryContext) && queryContext.getGroupByExpressions() == null) {
// Aggregation-only query with a single row output
for (int i = 0; i < v1ColumnNames.length; i++) {
if (!Objects.equals(v1Response.getResultTable().getRows().get(0)[i],
v2Response.getResultTable().getRows().get(0)[i])) {
differences.add("Mismatch in aggregation value for " + v1ColumnNames[i]
+ ". v1 value: " + v1Response.getResultTable().getRows().get(0)[i]
+ ", v2 value: " + v2Response.getResultTable().getRows().get(0)[i]);
}
}
}

// TODO: Compare response row values if it makes sense for the query type. Handle edge cases with group trimming,
// non-deterministic results for order by queries with limits etc.

return differences;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@
*/
package org.apache.pinot.broker.api.resources;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.ArrayList;
import java.util.List;
import javax.ws.rs.core.Response;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -51,4 +58,53 @@ public void testGetPinotQueryResponse()
Assert.assertEquals(tableDoesNotExistResponse.getHeaders().get(PINOT_QUERY_ERROR_CODE_HEADER).get(0),
TABLE_DOES_NOT_EXIST_ERROR_CODE);
}

@Test
public void testPinotQueryComparison() throws Exception {
// Aggregation type difference

BrokerResponse v1BrokerResponse = new BrokerResponseNative();
DataSchema v1DataSchema = new DataSchema(new String[]{"sum(col)"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE});
v1BrokerResponse.setResultTable(new ResultTable(v1DataSchema, List.<Object[]>of(new Object[]{1234})));

BrokerResponse v2BrokerResponse = new BrokerResponseNative();
DataSchema v2DataSchema = new DataSchema(new String[]{"EXPR$0"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.LONG});
v2BrokerResponse.setResultTable(new ResultTable(v2DataSchema, List.<Object[]>of(new Object[]{1234})));

ObjectNode comparisonResponse = (ObjectNode) PinotClientRequest.getPinotQueryComparisonResponse(
"SELECT SUM(col) FROM mytable", v1BrokerResponse, v2BrokerResponse).getEntity();

List<String> comparisonAnalysis = new ObjectMapper().readerFor(new TypeReference<List<String>>() { })
.readValue(comparisonResponse.get("comparisonAnalysis"));

Assert.assertEquals(comparisonAnalysis.size(), 1);
Assert.assertTrue(comparisonAnalysis.get(0).contains("v1 type: DOUBLE, v2 type: LONG"));

// Default limit in v1

v1DataSchema = new DataSchema(new String[]{"col1"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING});
v2DataSchema = new DataSchema(new String[]{"col1"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING});
List<Object[]> rows = new ArrayList<>();
for (int i = 0; i < 10; i++) {
rows.add(new Object[]{i});
}
v1BrokerResponse.setResultTable(new ResultTable(v1DataSchema, new ArrayList<>(rows)));
for (int i = 10; i < 100; i++) {
rows.add(new Object[]{i});
}
v2BrokerResponse.setResultTable(new ResultTable(v2DataSchema, new ArrayList<>(rows)));

comparisonResponse = (ObjectNode) PinotClientRequest.getPinotQueryComparisonResponse(
"SELECT col1 FROM mytable", v1BrokerResponse, v2BrokerResponse).getEntity();

comparisonAnalysis = new ObjectMapper().readerFor(new TypeReference<List<String>>() { })
.readValue(comparisonResponse.get("comparisonAnalysis"));

Assert.assertEquals(comparisonAnalysis.size(), 1);
Assert.assertTrue(comparisonAnalysis.get(0).contains("Mismatch in number of rows returned"));
}
}

0 comments on commit 0e4a24a

Please sign in to comment.