Skip to content

Commit

Permalink
Add support for separate v1 and v2 SQL queries; add more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
yashmayya committed Aug 27, 2024
1 parent 0e4a24a commit 485c9dd
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -258,28 +258,38 @@ public void processSqlQueryWithBothEnginesAndCompareResults(String query, @Suspe
@Context HttpHeaders httpHeaders) {
try {
JsonNode requestJson = JsonUtils.stringToJsonNode(query);
// TODO: Support inputting different v1 and v2 SQL queries
if (!requestJson.has(Request.SQL)) {
throw new IllegalStateException("Payload is missing the query string field 'sql'");
String v1Query;
String v2Query;
if (requestJson.has(Request.SQL)) {
v1Query = requestJson.get(Request.SQL).asText();
v2Query = v1Query;
} else if (requestJson.has(Request.V1SQL) && requestJson.has(Request.V2SQL)) {
v1Query = requestJson.get(Request.V1SQL).asText();
v2Query = requestJson.get(Request.V2SQL).asText();
} else {
throw new IllegalStateException("Payload should either contain the query string field '" + Request.SQL + "' "
+ "or both of '" + Request.V1SQL + "' and '" + Request.V2SQL + "'");
}

ObjectNode v1RequestJson = requestJson.deepCopy();
v1RequestJson.put(Request.SQL, v1Query);
CompletableFuture<BrokerResponse> v1Response = CompletableFuture.supplyAsync(
() -> {
try {
return executeSqlQuery((ObjectNode) requestJson, makeHttpIdentity(requestContext),
true, httpHeaders, false);
return executeSqlQuery(v1RequestJson, makeHttpIdentity(requestContext), true, httpHeaders, false);
} catch (Exception e) {
throw new RuntimeException(e);
}
},
_executor
);

ObjectNode v2RequestJson = requestJson.deepCopy();
v2RequestJson.put(Request.SQL, v2Query);
CompletableFuture<BrokerResponse> v2Response = CompletableFuture.supplyAsync(
() -> {
try {
return executeSqlQuery((ObjectNode) requestJson, makeHttpIdentity(requestContext),
true, httpHeaders, true);
return executeSqlQuery(v2RequestJson, makeHttpIdentity(requestContext), true, httpHeaders, true);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -289,8 +299,7 @@ public void processSqlQueryWithBothEnginesAndCompareResults(String query, @Suspe

CompletableFuture.allOf(v1Response, v2Response).join();

asyncResponse.resume(getPinotQueryComparisonResponse(requestJson.get(Request.SQL).asText(), v1Response.get(),
v2Response.get()));
asyncResponse.resume(getPinotQueryComparisonResponse(v1Query, v1Response.get(), v2Response.get()));
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
Expand Down Expand Up @@ -448,6 +457,7 @@ static Response getPinotQueryResponse(BrokerResponse brokerResponse)
.build();
}

@VisibleForTesting
static Response getPinotQueryComparisonResponse(String query, BrokerResponse v1Response, BrokerResponse v2Response) {
ObjectNode response = JsonUtils.newObjectNode();
response.set("v1Response", JsonUtils.objectToJsonNode(v1Response));
Expand All @@ -468,6 +478,20 @@ private static List<String> analyzeDifferences(String query, BrokerResponse v1Re
return differences;
}

if (v1Response.getResultTable() == null && v2Response.getResultTable() == null) {
return differences;
}

if (v1Response.getResultTable() == null) {
differences.add("v1 response has an empty result table");
return differences;
}

if (v2Response.getResultTable() == null) {
differences.add("v2 response has an empty result table");
return differences;
}

DataSchema.ColumnDataType[] v1ResponseTypes = v1Response.getResultTable().getDataSchema().getColumnDataTypes();
DataSchema.ColumnDataType[] v2ResponseTypes = v2Response.getResultTable().getDataSchema().getColumnDataTypes();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,141 @@
package org.apache.pinot.broker.api.resources;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.glassfish.grizzly.http.server.Request;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.apache.pinot.common.exception.QueryException.TABLE_DOES_NOT_EXIST_ERROR_CODE;
import static org.apache.pinot.spi.utils.CommonConstants.Controller.PINOT_QUERY_ERROR_CODE_HEADER;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;


public class PinotClientRequestTest {

@Mock private SqlQueryExecutor _sqlQueryExecutor;
@Mock private BrokerRequestHandler _requestHandler;
@Mock private BrokerMetrics _brokerMetrics;
@Mock private Executor _executor;
@Mock private HttpClientConnectionManager _httpConnMgr;
@InjectMocks private PinotClientRequest _pinotClientRequest;

@BeforeMethod
public void setUp() {
MockitoAnnotations.openMocks(this);
doAnswer(invocation -> {
Runnable runnable = invocation.getArgument(0);
runnable.run();
return null;
}).when(_executor).execute(any(Runnable.class));
}

@Test
public void testGetPinotQueryResponse()
throws Exception {

// for successful query result the 'X-Pinot-Error-Code' should be -1
BrokerResponse emptyResultBrokerResponse = BrokerResponseNative.EMPTY_RESULT;
Response successfulResponse = PinotClientRequest.getPinotQueryResponse(emptyResultBrokerResponse);
Assert.assertEquals(successfulResponse.getStatus(), Response.Status.OK.getStatusCode());
assertEquals(successfulResponse.getStatus(), Response.Status.OK.getStatusCode());
Assert.assertTrue(successfulResponse.getHeaders().containsKey(PINOT_QUERY_ERROR_CODE_HEADER));
Assert.assertEquals(successfulResponse.getHeaders().get(PINOT_QUERY_ERROR_CODE_HEADER).size(), 1);
Assert.assertEquals(successfulResponse.getHeaders().get(PINOT_QUERY_ERROR_CODE_HEADER).get(0), -1);
assertEquals(successfulResponse.getHeaders().get(PINOT_QUERY_ERROR_CODE_HEADER).size(), 1);
assertEquals(successfulResponse.getHeaders().get(PINOT_QUERY_ERROR_CODE_HEADER).get(0), -1);

// for failed query result the 'X-Pinot-Error-Code' should be Error code fo exception.
BrokerResponse tableDoesNotExistBrokerResponse = BrokerResponseNative.TABLE_DOES_NOT_EXIST;
Response tableDoesNotExistResponse = PinotClientRequest.getPinotQueryResponse(tableDoesNotExistBrokerResponse);
Assert.assertEquals(tableDoesNotExistResponse.getStatus(), Response.Status.OK.getStatusCode());
assertEquals(tableDoesNotExistResponse.getStatus(), Response.Status.OK.getStatusCode());
Assert.assertTrue(tableDoesNotExistResponse.getHeaders().containsKey(PINOT_QUERY_ERROR_CODE_HEADER));
Assert.assertEquals(tableDoesNotExistResponse.getHeaders().get(PINOT_QUERY_ERROR_CODE_HEADER).size(), 1);
Assert.assertEquals(tableDoesNotExistResponse.getHeaders().get(PINOT_QUERY_ERROR_CODE_HEADER).get(0),
assertEquals(tableDoesNotExistResponse.getHeaders().get(PINOT_QUERY_ERROR_CODE_HEADER).size(), 1);
assertEquals(tableDoesNotExistResponse.getHeaders().get(PINOT_QUERY_ERROR_CODE_HEADER).get(0),
TABLE_DOES_NOT_EXIST_ERROR_CODE);
}

@Test
public void testPinotQueryComparisonApiSameQuery() throws Exception {
AsyncResponse asyncResponse = mock(AsyncResponse.class);
Request request = mock(Request.class);
when(request.getRequestURL()).thenReturn(new StringBuilder());
when(_requestHandler.handleRequest(any(), any(), any(), any(), any()))
.thenReturn(BrokerResponseNative.EMPTY_RESULT);
_pinotClientRequest.processSqlQueryWithBothEnginesAndCompareResults("{\"sql\": \"SELECT * FROM mytable\"}",
asyncResponse, request, null);

ArgumentCaptor<JsonNode> requestCaptor = ArgumentCaptor.forClass(JsonNode.class);
ArgumentCaptor<SqlNodeAndOptions> sqlNodeAndOptionsCaptor = ArgumentCaptor.forClass(SqlNodeAndOptions.class);
verify(_requestHandler, times(2)).handleRequest(requestCaptor.capture(), sqlNodeAndOptionsCaptor.capture(),
any(), any(), any());
verify(asyncResponse, times(1)).resume(any(Response.class));

assertEquals(requestCaptor.getAllValues().size(), 2);
assertEquals(requestCaptor.getAllValues().get(0).get("sql").asText(), "SELECT * FROM mytable");
assertEquals(requestCaptor.getAllValues().get(1).get("sql").asText(), "SELECT * FROM mytable");

assertEquals(sqlNodeAndOptionsCaptor.getAllValues().size(), 2);
assertFalse(sqlNodeAndOptionsCaptor.getAllValues().get(0).getOptions()
.containsKey(CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE));
assertEquals(sqlNodeAndOptionsCaptor.getAllValues().get(1).getOptions()
.get(CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE), "true");
}

@Test
public void testPinotQueryComparisonApiDifferentQuery() throws Exception {
AsyncResponse asyncResponse = mock(AsyncResponse.class);
Request request = mock(Request.class);
when(request.getRequestURL()).thenReturn(new StringBuilder());
when(_requestHandler.handleRequest(any(), any(), any(), any(), any()))
.thenReturn(BrokerResponseNative.EMPTY_RESULT);
_pinotClientRequest.processSqlQueryWithBothEnginesAndCompareResults("{\"v1sql\": \"SELECT v1 FROM mytable\","
+ "\"v2sql\": \"SELECT v2 FROM mytable\"}", asyncResponse, request, null);

ArgumentCaptor<JsonNode> requestCaptor = ArgumentCaptor.forClass(JsonNode.class);
verify(_requestHandler, times(2)).handleRequest(requestCaptor.capture(), any(), any(), any(), any());
verify(asyncResponse, times(1)).resume(any(Response.class));

assertEquals(requestCaptor.getAllValues().size(), 2);
assertEquals(requestCaptor.getAllValues().get(0).get("sql").asText(), "SELECT v1 FROM mytable");
assertEquals(requestCaptor.getAllValues().get(1).get("sql").asText(), "SELECT v2 FROM mytable");
}

@Test
public void testPinotQueryComparisonApiMissingSql() throws Exception {
AsyncResponse asyncResponse = mock(AsyncResponse.class);
Request request = mock(Request.class);
when(request.getRequestURL()).thenReturn(new StringBuilder());
// Both v1sql and v2sql should be present
_pinotClientRequest.processSqlQueryWithBothEnginesAndCompareResults("{\"v1sql\": \"SELECT v1 FROM mytable\"}",
asyncResponse, request, null);

verify(_requestHandler, never()).handleRequest(any(), any(), any(), any(), any());
verify(asyncResponse, times(1)).resume(any(Throwable.class));
}

@Test
public void testPinotQueryComparison() throws Exception {
// Aggregation type difference
Expand All @@ -79,7 +174,7 @@ public void testPinotQueryComparison() throws Exception {
List<String> comparisonAnalysis = new ObjectMapper().readerFor(new TypeReference<List<String>>() { })
.readValue(comparisonResponse.get("comparisonAnalysis"));

Assert.assertEquals(comparisonAnalysis.size(), 1);
assertEquals(comparisonAnalysis.size(), 1);
Assert.assertTrue(comparisonAnalysis.get(0).contains("v1 type: DOUBLE, v2 type: LONG"));

// Default limit in v1
Expand All @@ -104,7 +199,7 @@ public void testPinotQueryComparison() throws Exception {
comparisonAnalysis = new ObjectMapper().readerFor(new TypeReference<List<String>>() { })
.readValue(comparisonResponse.get("comparisonAnalysis"));

Assert.assertEquals(comparisonAnalysis.size(), 1);
assertEquals(comparisonAnalysis.size(), 1);
Assert.assertTrue(comparisonAnalysis.get(0).contains("Mismatch in number of rows returned"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,8 @@ public static class Broker {

public static class Request {
public static final String SQL = "sql";
public static final String V1SQL = "v1sql";
public static final String V2SQL = "v2sql";
public static final String TRACE = "trace";
public static final String QUERY_OPTIONS = "queryOptions";

Expand Down

0 comments on commit 485c9dd

Please sign in to comment.