Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add broker API to run a query on both query engines and compare results #13746

Merged
merged 4 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.inject.Inject;
import javax.ws.rs.DELETE;
Expand Down Expand Up @@ -64,12 +67,16 @@
import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.ManualAuthorization;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.RequestScope;
import org.apache.pinot.spi.trace.Tracing;
Expand Down Expand Up @@ -280,6 +287,83 @@ public void processTimeSeriesInstantQuery(@Suspended AsyncResponse asyncResponse
asyncResponse.resume(Response.ok().entity("{}").build());
}

@POST
@Produces(MediaType.APPLICATION_JSON)
@Path("query/compare")
@ApiOperation(value = "Query Pinot using both the single-stage query engine and the multi-stage query engine and "
+ "compare the results. The 'sql' field should be set in the request JSON to run the same query on both the "
+ "query engines. Set '" + Request.SQL_V1 + "' and '" + Request.SQL_V2 + "' if the query needs to be adapted for "
+ "the two query engines.")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Query result comparison response"),
@ApiResponse(code = 500, message = "Internal Server Error")
})
@ManualAuthorization
public void processSqlQueryWithBothEnginesAndCompareResults(String query, @Suspended AsyncResponse asyncResponse,
@Context org.glassfish.grizzly.http.server.Request requestContext,
@Context HttpHeaders httpHeaders) {
try {
JsonNode requestJson = JsonUtils.stringToJsonNode(query);
String v1Query;
String v2Query;

if (!requestJson.has(Request.SQL)) {
if (!requestJson.has(Request.SQL_V1) || !requestJson.has(Request.SQL_V2)) {
throw new IllegalStateException("Payload should either contain the query string field '" + Request.SQL + "' "
+ "or both of '" + Request.SQL_V1 + "' and '" + Request.SQL_V2 + "'");
} else {
v1Query = requestJson.get(Request.SQL_V1).asText();
v2Query = requestJson.get(Request.SQL_V2).asText();
}
} else {
v1Query = requestJson.has(Request.SQL_V1) ? requestJson.get(Request.SQL_V1).asText()
: requestJson.get(Request.SQL).asText();
v2Query = requestJson.has(Request.SQL_V2) ? requestJson.get(Request.SQL_V2).asText()
: requestJson.get(Request.SQL).asText();
}

ObjectNode v1RequestJson = requestJson.deepCopy();
v1RequestJson.put(Request.SQL, v1Query);
CompletableFuture<BrokerResponse> v1Response = CompletableFuture.supplyAsync(
() -> {
try {
return executeSqlQuery(v1RequestJson, makeHttpIdentity(requestContext), true, httpHeaders, false);
} catch (Exception e) {
throw new RuntimeException(e);
}
},
_executor
);

ObjectNode v2RequestJson = requestJson.deepCopy();
v2RequestJson.put(Request.SQL, v2Query);
CompletableFuture<BrokerResponse> v2Response = CompletableFuture.supplyAsync(
() -> {
try {
return executeSqlQuery(v2RequestJson, makeHttpIdentity(requestContext), true, httpHeaders, true);
} catch (Exception e) {
throw new RuntimeException(e);
}
},
_executor
);

CompletableFuture.allOf(v1Response, v2Response).join();

asyncResponse.resume(getPinotQueryComparisonResponse(v1Query, v1Response.get(), v2Response.get()));
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
LOGGER.error("Caught exception while processing request", e);
asyncResponse.resume(
new WebApplicationException(e,
Response
.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity(e.getMessage())
.build()));
}
}

@DELETE
@Path("query/{queryId}")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY)
Expand Down Expand Up @@ -356,7 +440,7 @@ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterI
PinotSqlType sqlType = sqlNodeAndOptions.getSqlType();
if (onlyDql && sqlType != PinotSqlType.DQL) {
return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
new UnsupportedOperationException("Unsupported SQL type - " + sqlType + ", GET API only supports DQL.")));
new UnsupportedOperationException("Unsupported SQL type - " + sqlType + ", this API only supports DQL.")));
}
switch (sqlType) {
case DQL:
Expand Down Expand Up @@ -428,4 +512,92 @@ static Response getPinotQueryResponse(BrokerResponse brokerResponse)
.entity((StreamingOutput) brokerResponse::toOutputStream).type(MediaType.APPLICATION_JSON)
.build();
}

@VisibleForTesting
static Response getPinotQueryComparisonResponse(String query, BrokerResponse v1Response, BrokerResponse v2Response) {
ObjectNode response = JsonUtils.newObjectNode();
response.set("v1Response", JsonUtils.objectToJsonNode(v1Response));
response.set("v2Response", JsonUtils.objectToJsonNode(v2Response));
response.set("comparisonAnalysis", JsonUtils.objectToJsonNode(
analyzeQueryResultDifferences(query, v1Response, v2Response)));

return Response.ok()
.header(PINOT_QUERY_ERROR_CODE_HEADER, -1)
.entity(response).type(MediaType.APPLICATION_JSON)
.build();
}

/**
* Given a query and the responses from the single-stage and multi-stage query engines, analyzes the differences
* between the responses and returns a list of differences. Currently, the method only compares the column names,
* column types, number of rows in the result set, and the aggregation values for aggregation-only queries.
*
* TODO: Add more comparison logic for different query types. This would require handling edge cases with group
* trimming, non-deterministic results for order by queries with limits etc.
*/
private static List<String> analyzeQueryResultDifferences(String query, BrokerResponse v1Response,
BrokerResponse v2Response) {
List<String> differences = new ArrayList<>();

if (v1Response.getExceptionsSize() != 0 || v2Response.getExceptionsSize() != 0) {
differences.add("Exception encountered while running the query on one or both query engines");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Add exception messages to help out the user ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query response itself contains the entire v1 and v2 responses which will have the exceptions. It seems redundant to duplicate that here?

return differences;
}

if (v1Response.getResultTable() == null && v2Response.getResultTable() == null) {
return differences;
}

if (v1Response.getResultTable() == null) {
differences.add("v1 response has an empty result table");
return differences;
}

if (v2Response.getResultTable() == null) {
differences.add("v2 response has an empty result table");
return differences;
}

DataSchema.ColumnDataType[] v1ResponseTypes = v1Response.getResultTable().getDataSchema().getColumnDataTypes();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to recognise cases where column ordering is different? I dont know if there are cases where the engines may return the result in different column orders.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not aware of any such cases where the column ordering differs in the two engines. However, we do check for column data type mismatches.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIR any simple select query without group by can return different results, even using the same engine. This should be more frequent if there are several segments involved in the query. In fact we would like to verify order if the query is order by and do not do that in the other case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Column data type check will identify it. IME, if there is a query with 20 columns, there will be 20 messages in the differences array while the real problem is just that columns are ordered differently. Anyway - it is unclear if this issue exists and can be detected as an improvement in the future.

DataSchema.ColumnDataType[] v2ResponseTypes = v2Response.getResultTable().getDataSchema().getColumnDataTypes();

if (v1ResponseTypes.length != v2ResponseTypes.length) {
differences.add("Mismatch in number of columns returned. v1: " + v1ResponseTypes.length
+ ", v2: " + v2ResponseTypes.length);
return differences;
}

String[] v1ColumnNames = v1Response.getResultTable().getDataSchema().getColumnNames();
String[] v2ColumnNames = v2Response.getResultTable().getDataSchema().getColumnNames();
for (int i = 0; i < v1ResponseTypes.length; i++) {
if (v1ResponseTypes[i] != v2ResponseTypes[i]) {
String columnName = v1ColumnNames[i].equals(v2ColumnNames[i])
? v1ColumnNames[i]
: v1ColumnNames[i] + " / " + v2ColumnNames[i];
differences.add("Mismatch in column data type for column with name " + columnName
+ ". v1 type: " + v1ResponseTypes[i] + ", v2 type: " + v2ResponseTypes[i]);
}
}

if (v1Response.getNumRowsResultSet() != v2Response.getNumRowsResultSet()) {
differences.add("Mismatch in number of rows returned. v1: " + v1Response.getNumRowsResultSet()
+ ", v2: " + v2Response.getNumRowsResultSet());
return differences;
}

QueryContext queryContext = QueryContextConverterUtils.getQueryContext(query);
if (QueryContextUtils.isAggregationQuery(queryContext) && queryContext.getGroupByExpressions() == null) {
// Aggregation-only query with a single row output
for (int i = 0; i < v1ColumnNames.length; i++) {
if (!Objects.equals(v1Response.getResultTable().getRows().get(0)[i],
v2Response.getResultTable().getRows().get(0)[i])) {
differences.add("Mismatch in aggregation value for " + v1ColumnNames[i]
+ ". v1 value: " + v1Response.getResultTable().getRows().get(0)[i]
+ ", v2 value: " + v2Response.getResultTable().getRows().get(0)[i]);
}
}
}

return differences;
}
}
Loading
Loading