Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add broker API to run a query on both query engines and compare results #13746

Merged
merged 4 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add broker API to run a query on both query engines and compare results
  • Loading branch information
yashmayya committed Sep 24, 2024
commit 3e96471f78e64ed18e2bf9fd73ae0bbe8566848c
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.inject.Inject;
import javax.ws.rs.DELETE;
Expand Down Expand Up @@ -64,13 +67,17 @@
import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.ManualAuthorization;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
import org.apache.pinot.spi.trace.RequestScope;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
Expand Down Expand Up @@ -280,6 +287,67 @@ public void processTimeSeriesInstantQuery(@Suspended AsyncResponse asyncResponse
asyncResponse.resume(Response.ok().entity("{}").build());
}

@POST
@Produces(MediaType.APPLICATION_JSON)
@Path("query/compare")
@ApiOperation(value = "Query Pinot using both the single stage query engine and the multi stage query engine and "
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am using swagger APIs quite a bit today. Setting sql vs v1Sql/v2Sql maybe confusing. Majority of users will only use the doc in the swagger page to use the API. There are a couple of options to reduce confusion:

  • Only provide v1Sql/v2Sql. Its not that hard to copy paste twice.
  • Change the one-line documentation to add more info. There is precedence to multi-line description in swagger page. For example:

Query Pinot using both the single stage query engine and the multi stage query engine and compare the results. Set sql field to run the same query in both engines. Set v1Sql & v2Sql instead if query text is different.

Copy link
Collaborator Author

@yashmayya yashmayya Sep 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I've gone with your option 2 since ideally we'd want most single-stage engine queries to work as is on the multi-stage query engine.

+ "compare the results")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Query result comparison response"),
@ApiResponse(code = 500, message = "Internal Server Error")
})
@ManualAuthorization
public void processSqlQueryWithBothEnginesAndCompareResults(String query, @Suspended AsyncResponse asyncResponse,
@Context org.glassfish.grizzly.http.server.Request requestContext,
@Context HttpHeaders httpHeaders) {
try {
JsonNode requestJson = JsonUtils.stringToJsonNode(query);
// TODO: Support inputting different v1 and v2 SQL queries
if (!requestJson.has(Request.SQL)) {
throw new IllegalStateException("Payload is missing the query string field 'sql'");
}

CompletableFuture<BrokerResponse> v1Response = CompletableFuture.supplyAsync(
() -> {
try {
return executeSqlQuery((ObjectNode) requestJson, makeHttpIdentity(requestContext),
true, httpHeaders, false);
} catch (Exception e) {
throw new RuntimeException(e);
}
},
_executor
);

CompletableFuture<BrokerResponse> v2Response = CompletableFuture.supplyAsync(
() -> {
try {
return executeSqlQuery((ObjectNode) requestJson, makeHttpIdentity(requestContext),
true, httpHeaders, true);
} catch (Exception e) {
throw new RuntimeException(e);
}
},
_executor
);

CompletableFuture.allOf(v1Response, v2Response).join();

asyncResponse.resume(getPinotQueryComparisonResponse(requestJson.get(Request.SQL).asText(), v1Response.get(),
v2Response.get()));
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
LOGGER.error("Caught exception while processing request", e);
asyncResponse.resume(
new WebApplicationException(e,
Response
.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity(e.getMessage())
.build()));
}
}

@DELETE
@Path("query/{queryId}")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY)
Expand Down Expand Up @@ -356,7 +424,7 @@ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterI
PinotSqlType sqlType = sqlNodeAndOptions.getSqlType();
if (onlyDql && sqlType != PinotSqlType.DQL) {
return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
new UnsupportedOperationException("Unsupported SQL type - " + sqlType + ", GET API only supports DQL.")));
new UnsupportedOperationException("Unsupported SQL type - " + sqlType + ", this API only supports DQL.")));
}
switch (sqlType) {
case DQL:
Expand Down Expand Up @@ -428,4 +496,70 @@ static Response getPinotQueryResponse(BrokerResponse brokerResponse)
.entity((StreamingOutput) brokerResponse::toOutputStream).type(MediaType.APPLICATION_JSON)
.build();
}

static Response getPinotQueryComparisonResponse(String query, BrokerResponse v1Response, BrokerResponse v2Response) {
ObjectNode response = JsonUtils.newObjectNode();
response.set("v1Response", JsonUtils.objectToJsonNode(v1Response));
response.set("v2Response", JsonUtils.objectToJsonNode(v2Response));
response.set("comparisonAnalysis", JsonUtils.objectToJsonNode(analyzeDifferences(query, v1Response, v2Response)));

return Response.ok()
.header(PINOT_QUERY_ERROR_CODE_HEADER, -1)
.entity(response).type(MediaType.APPLICATION_JSON)
.build();
}

private static List<String> analyzeDifferences(String query, BrokerResponse v1Response, BrokerResponse v2Response) {
List<String> differences = new ArrayList<>();

if (v1Response.getExceptionsSize() != 0 || v2Response.getExceptionsSize() != 0) {
differences.add("Exception encountered while running the query on one or both query engines");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Add exception messages to help out the user ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query response itself contains the entire v1 and v2 responses which will have the exceptions. It seems redundant to duplicate that here?

return differences;
}

DataSchema.ColumnDataType[] v1ResponseTypes = v1Response.getResultTable().getDataSchema().getColumnDataTypes();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to recognise cases where column ordering is different? I dont know if there are cases where the engines may return the result in different column orders.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not aware of any such cases where the column ordering differs in the two engines. However, we do check for column data type mismatches.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIR any simple select query without group by can return different results, even using the same engine. This should be more frequent if there are several segments involved in the query. In fact we would like to verify order if the query is order by and do not do that in the other case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Column data type check will identify it. IME, if there is a query with 20 columns, there will be 20 messages in the differences array while the real problem is just that columns are ordered differently. Anyway - it is unclear if this issue exists and can be detected as an improvement in the future.

DataSchema.ColumnDataType[] v2ResponseTypes = v2Response.getResultTable().getDataSchema().getColumnDataTypes();

if (v1ResponseTypes.length != v2ResponseTypes.length) {
differences.add("Mismatch in number of columns returned. v1: " + v1ResponseTypes.length
+ ", v2: " + v2ResponseTypes.length);
return differences;
}

String[] v1ColumnNames = v1Response.getResultTable().getDataSchema().getColumnNames();
String[] v2ColumnNames = v2Response.getResultTable().getDataSchema().getColumnNames();
for (int i = 0; i < v1ResponseTypes.length; i++) {
if (v1ResponseTypes[i] != v2ResponseTypes[i]) {
String columnName = v1ColumnNames[i].equals(v2ColumnNames[i])
? v1ColumnNames[i]
: v1ColumnNames[i] + " / " + v2ColumnNames[i];
differences.add("Mismatch in column data type for column with name " + columnName
+ ". v1 type: " + v1ResponseTypes[i] + ", v2 type: " + v2ResponseTypes[i]);
}
}

if (v1Response.getNumRowsResultSet() != v2Response.getNumRowsResultSet()) {
differences.add("Mismatch in number of rows returned. v1: " + v1Response.getNumRowsResultSet()
+ ", v2: " + v2Response.getNumRowsResultSet());
return differences;
}

QueryContext queryContext = QueryContextConverterUtils.getQueryContext(query);
if (QueryContextUtils.isAggregationQuery(queryContext) && queryContext.getGroupByExpressions() == null) {
// Aggregation-only query with a single row output
for (int i = 0; i < v1ColumnNames.length; i++) {
if (!Objects.equals(v1Response.getResultTable().getRows().get(0)[i],
v2Response.getResultTable().getRows().get(0)[i])) {
differences.add("Mismatch in aggregation value for " + v1ColumnNames[i]
+ ". v1 value: " + v1Response.getResultTable().getRows().get(0)[i]
+ ", v2 value: " + v2Response.getResultTable().getRows().get(0)[i]);
}
}
}

// TODO: Compare response row values if it makes sense for the query type. Handle edge cases with group trimming,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: document as a javadoc?

// non-deterministic results for order by queries with limits etc.

return differences;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@
*/
package org.apache.pinot.broker.api.resources;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.ArrayList;
import java.util.List;
import javax.ws.rs.core.Response;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -51,4 +58,53 @@ public void testGetPinotQueryResponse()
Assert.assertEquals(tableDoesNotExistResponse.getHeaders().get(PINOT_QUERY_ERROR_CODE_HEADER).get(0),
TABLE_DOES_NOT_EXIST_ERROR_CODE);
}

@Test
public void testPinotQueryComparison() throws Exception {
// Aggregation type difference

BrokerResponse v1BrokerResponse = new BrokerResponseNative();
DataSchema v1DataSchema = new DataSchema(new String[]{"sum(col)"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE});
v1BrokerResponse.setResultTable(new ResultTable(v1DataSchema, List.<Object[]>of(new Object[]{1234})));

BrokerResponse v2BrokerResponse = new BrokerResponseNative();
DataSchema v2DataSchema = new DataSchema(new String[]{"EXPR$0"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.LONG});
v2BrokerResponse.setResultTable(new ResultTable(v2DataSchema, List.<Object[]>of(new Object[]{1234})));

ObjectNode comparisonResponse = (ObjectNode) PinotClientRequest.getPinotQueryComparisonResponse(
"SELECT SUM(col) FROM mytable", v1BrokerResponse, v2BrokerResponse).getEntity();

List<String> comparisonAnalysis = new ObjectMapper().readerFor(new TypeReference<List<String>>() { })
.readValue(comparisonResponse.get("comparisonAnalysis"));

Assert.assertEquals(comparisonAnalysis.size(), 1);
Assert.assertTrue(comparisonAnalysis.get(0).contains("v1 type: DOUBLE, v2 type: LONG"));

// Default limit in v1

v1DataSchema = new DataSchema(new String[]{"col1"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING});
v2DataSchema = new DataSchema(new String[]{"col1"},
new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING});
List<Object[]> rows = new ArrayList<>();
for (int i = 0; i < 10; i++) {
rows.add(new Object[]{i});
}
v1BrokerResponse.setResultTable(new ResultTable(v1DataSchema, new ArrayList<>(rows)));
for (int i = 10; i < 100; i++) {
rows.add(new Object[]{i});
}
v2BrokerResponse.setResultTable(new ResultTable(v2DataSchema, new ArrayList<>(rows)));

comparisonResponse = (ObjectNode) PinotClientRequest.getPinotQueryComparisonResponse(
"SELECT col1 FROM mytable", v1BrokerResponse, v2BrokerResponse).getEntity();

comparisonAnalysis = new ObjectMapper().readerFor(new TypeReference<List<String>>() { })
.readValue(comparisonResponse.get("comparisonAnalysis"));

Assert.assertEquals(comparisonAnalysis.size(), 1);
Assert.assertTrue(comparisonAnalysis.get(0).contains("Mismatch in number of rows returned"));
}
}