diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java index fd3f58fbf25d..acc26604b839 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java @@ -33,9 +33,12 @@ import io.swagger.annotations.Authorization; import io.swagger.annotations.SecurityDefinition; import io.swagger.annotations.SwaggerDefinition; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import javax.inject.Inject; import javax.ws.rs.DELETE; @@ -63,12 +66,16 @@ import org.apache.pinot.common.response.BrokerResponse; import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.response.broker.QueryProcessingException; +import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.request.RequestUtils; import org.apache.pinot.core.auth.Actions; import org.apache.pinot.core.auth.Authorize; import org.apache.pinot.core.auth.ManualAuthorization; import org.apache.pinot.core.auth.TargetType; import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; +import org.apache.pinot.core.query.request.context.utils.QueryContextUtils; import org.apache.pinot.spi.trace.RequestScope; import org.apache.pinot.spi.trace.Tracing; import org.apache.pinot.spi.utils.CommonConstants.Broker.Request; @@ -236,6 +243,76 @@ public void processSqlWithMultiStageQueryEnginePost(String query, @Suspended Asy } } + @POST + @Produces(MediaType.APPLICATION_JSON) + @Path("query/compare") + @ApiOperation(value = "Query Pinot using both the single stage query engine and the multi stage query engine and " + + "compare the results") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Query result comparison response"), + @ApiResponse(code = 500, message = "Internal Server Error") + }) + @ManualAuthorization + public void processSqlQueryWithBothEnginesAndCompareResults(String query, @Suspended AsyncResponse asyncResponse, + @Context org.glassfish.grizzly.http.server.Request requestContext, + @Context HttpHeaders httpHeaders) { + try { + JsonNode requestJson = JsonUtils.stringToJsonNode(query); + String v1Query; + String v2Query; + if (requestJson.has(Request.SQL)) { + v1Query = requestJson.get(Request.SQL).asText(); + v2Query = v1Query; + } else if (requestJson.has(Request.V1SQL) && requestJson.has(Request.V2SQL)) { + v1Query = requestJson.get(Request.V1SQL).asText(); + v2Query = requestJson.get(Request.V2SQL).asText(); + } else { + throw new IllegalStateException("Payload should either contain the query string field '" + Request.SQL + "' " + + "or both of '" + Request.V1SQL + "' and '" + Request.V2SQL + "'"); + } + + ObjectNode v1RequestJson = requestJson.deepCopy(); + v1RequestJson.put(Request.SQL, v1Query); + CompletableFuture v1Response = CompletableFuture.supplyAsync( + () -> { + try { + return executeSqlQuery(v1RequestJson, makeHttpIdentity(requestContext), true, httpHeaders, false); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, + _executor + ); + + ObjectNode v2RequestJson = requestJson.deepCopy(); + v2RequestJson.put(Request.SQL, v2Query); + CompletableFuture v2Response = CompletableFuture.supplyAsync( + () -> { + try { + return executeSqlQuery(v2RequestJson, makeHttpIdentity(requestContext), true, httpHeaders, true); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, + _executor + ); + + CompletableFuture.allOf(v1Response, v2Response).join(); + + asyncResponse.resume(getPinotQueryComparisonResponse(v1Query, v1Response.get(), v2Response.get())); + } catch (WebApplicationException wae) { + asyncResponse.resume(wae); + } catch (Exception e) { + LOGGER.error("Caught exception while processing request", e); + asyncResponse.resume( + new WebApplicationException(e, + Response + .status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(e.getMessage()) + .build())); + } + } + @DELETE @Path("query/{queryId}") @Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY) @@ -312,7 +389,7 @@ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterI PinotSqlType sqlType = sqlNodeAndOptions.getSqlType(); if (onlyDql && sqlType != PinotSqlType.DQL) { return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, - new UnsupportedOperationException("Unsupported SQL type - " + sqlType + ", GET API only supports DQL."))); + new UnsupportedOperationException("Unsupported SQL type - " + sqlType + ", this API only supports DQL."))); } switch (sqlType) { case DQL: @@ -379,4 +456,85 @@ static Response getPinotQueryResponse(BrokerResponse brokerResponse) .entity((StreamingOutput) brokerResponse::toOutputStream).type(MediaType.APPLICATION_JSON) .build(); } + + @VisibleForTesting + static Response getPinotQueryComparisonResponse(String query, BrokerResponse v1Response, BrokerResponse v2Response) { + ObjectNode response = JsonUtils.newObjectNode(); + response.set("v1Response", JsonUtils.objectToJsonNode(v1Response)); + response.set("v2Response", JsonUtils.objectToJsonNode(v2Response)); + response.set("comparisonAnalysis", JsonUtils.objectToJsonNode(analyzeDifferences(query, v1Response, v2Response))); + + return Response.ok() + .header(PINOT_QUERY_ERROR_CODE_HEADER, -1) + .entity(response).type(MediaType.APPLICATION_JSON) + .build(); + } + + private static List analyzeDifferences(String query, BrokerResponse v1Response, BrokerResponse v2Response) { + List differences = new ArrayList<>(); + + if (v1Response.getExceptionsSize() != 0 || v2Response.getExceptionsSize() != 0) { + differences.add("Exception encountered while running the query on one or both query engines"); + return differences; + } + + if (v1Response.getResultTable() == null && v2Response.getResultTable() == null) { + return differences; + } + + if (v1Response.getResultTable() == null) { + differences.add("v1 response has an empty result table"); + return differences; + } + + if (v2Response.getResultTable() == null) { + differences.add("v2 response has an empty result table"); + return differences; + } + + DataSchema.ColumnDataType[] v1ResponseTypes = v1Response.getResultTable().getDataSchema().getColumnDataTypes(); + DataSchema.ColumnDataType[] v2ResponseTypes = v2Response.getResultTable().getDataSchema().getColumnDataTypes(); + + if (v1ResponseTypes.length != v2ResponseTypes.length) { + differences.add("Mismatch in number of columns returned. v1: " + v1ResponseTypes.length + + ", v2: " + v2ResponseTypes.length); + return differences; + } + + String[] v1ColumnNames = v1Response.getResultTable().getDataSchema().getColumnNames(); + String[] v2ColumnNames = v2Response.getResultTable().getDataSchema().getColumnNames(); + for (int i = 0; i < v1ResponseTypes.length; i++) { + if (v1ResponseTypes[i] != v2ResponseTypes[i]) { + String columnName = v1ColumnNames[i].equals(v2ColumnNames[i]) + ? v1ColumnNames[i] + : v1ColumnNames[i] + " / " + v2ColumnNames[i]; + differences.add("Mismatch in column data type for column with name " + columnName + + ". v1 type: " + v1ResponseTypes[i] + ", v2 type: " + v2ResponseTypes[i]); + } + } + + if (v1Response.getNumRowsResultSet() != v2Response.getNumRowsResultSet()) { + differences.add("Mismatch in number of rows returned. v1: " + v1Response.getNumRowsResultSet() + + ", v2: " + v2Response.getNumRowsResultSet()); + return differences; + } + + QueryContext queryContext = QueryContextConverterUtils.getQueryContext(query); + if (QueryContextUtils.isAggregationQuery(queryContext) && queryContext.getGroupByExpressions() == null) { + // Aggregation-only query with a single row output + for (int i = 0; i < v1ColumnNames.length; i++) { + if (!Objects.equals(v1Response.getResultTable().getRows().get(0)[i], + v2Response.getResultTable().getRows().get(0)[i])) { + differences.add("Mismatch in aggregation value for " + v1ColumnNames[i] + + ". v1 value: " + v1Response.getResultTable().getRows().get(0)[i] + + ", v2 value: " + v2Response.getResultTable().getRows().get(0)[i]); + } + } + } + + // TODO: Compare response row values if it makes sense for the query type. Handle edge cases with group trimming, + // non-deterministic results for order by queries with limits etc. + + return differences; + } } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/api/resources/PinotClientRequestTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/api/resources/PinotClientRequestTest.java index e79b20c57b71..b772f3340e8b 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/api/resources/PinotClientRequestTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/api/resources/PinotClientRequestTest.java @@ -18,18 +18,61 @@ */ package org.apache.pinot.broker.api.resources; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; +import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.core.Response; +import org.apache.hc.client5.http.io.HttpClientConnectionManager; +import org.apache.pinot.broker.requesthandler.BrokerRequestHandler; +import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.response.BrokerResponse; import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.sql.parsers.SqlNodeAndOptions; +import org.glassfish.grizzly.http.server.Request; +import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.testng.Assert; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import static org.apache.pinot.common.exception.QueryException.TABLE_DOES_NOT_EXIST_ERROR_CODE; import static org.apache.pinot.spi.utils.CommonConstants.Controller.PINOT_QUERY_ERROR_CODE_HEADER; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; public class PinotClientRequestTest { + @Mock private SqlQueryExecutor _sqlQueryExecutor; + @Mock private BrokerRequestHandler _requestHandler; + @Mock private BrokerMetrics _brokerMetrics; + @Mock private Executor _executor; + @Mock private HttpClientConnectionManager _httpConnMgr; + @InjectMocks private PinotClientRequest _pinotClientRequest; + + @BeforeMethod + public void setUp() { + MockitoAnnotations.openMocks(this); + doAnswer(invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }).when(_executor).execute(any(Runnable.class)); + } + @Test public void testGetPinotQueryResponse() throws Exception { @@ -37,18 +80,126 @@ public void testGetPinotQueryResponse() // for successful query result the 'X-Pinot-Error-Code' should be -1 BrokerResponse emptyResultBrokerResponse = BrokerResponseNative.EMPTY_RESULT; Response successfulResponse = PinotClientRequest.getPinotQueryResponse(emptyResultBrokerResponse); - Assert.assertEquals(successfulResponse.getStatus(), Response.Status.OK.getStatusCode()); + assertEquals(successfulResponse.getStatus(), Response.Status.OK.getStatusCode()); Assert.assertTrue(successfulResponse.getHeaders().containsKey(PINOT_QUERY_ERROR_CODE_HEADER)); - Assert.assertEquals(successfulResponse.getHeaders().get(PINOT_QUERY_ERROR_CODE_HEADER).size(), 1); - Assert.assertEquals(successfulResponse.getHeaders().get(PINOT_QUERY_ERROR_CODE_HEADER).get(0), -1); + assertEquals(successfulResponse.getHeaders().get(PINOT_QUERY_ERROR_CODE_HEADER).size(), 1); + assertEquals(successfulResponse.getHeaders().get(PINOT_QUERY_ERROR_CODE_HEADER).get(0), -1); // for failed query result the 'X-Pinot-Error-Code' should be Error code fo exception. BrokerResponse tableDoesNotExistBrokerResponse = BrokerResponseNative.TABLE_DOES_NOT_EXIST; Response tableDoesNotExistResponse = PinotClientRequest.getPinotQueryResponse(tableDoesNotExistBrokerResponse); - Assert.assertEquals(tableDoesNotExistResponse.getStatus(), Response.Status.OK.getStatusCode()); + assertEquals(tableDoesNotExistResponse.getStatus(), Response.Status.OK.getStatusCode()); Assert.assertTrue(tableDoesNotExistResponse.getHeaders().containsKey(PINOT_QUERY_ERROR_CODE_HEADER)); - Assert.assertEquals(tableDoesNotExistResponse.getHeaders().get(PINOT_QUERY_ERROR_CODE_HEADER).size(), 1); - Assert.assertEquals(tableDoesNotExistResponse.getHeaders().get(PINOT_QUERY_ERROR_CODE_HEADER).get(0), + assertEquals(tableDoesNotExistResponse.getHeaders().get(PINOT_QUERY_ERROR_CODE_HEADER).size(), 1); + assertEquals(tableDoesNotExistResponse.getHeaders().get(PINOT_QUERY_ERROR_CODE_HEADER).get(0), TABLE_DOES_NOT_EXIST_ERROR_CODE); } + + @Test + public void testPinotQueryComparisonApiSameQuery() throws Exception { + AsyncResponse asyncResponse = mock(AsyncResponse.class); + Request request = mock(Request.class); + when(request.getRequestURL()).thenReturn(new StringBuilder()); + when(_requestHandler.handleRequest(any(), any(), any(), any(), any())) + .thenReturn(BrokerResponseNative.EMPTY_RESULT); + _pinotClientRequest.processSqlQueryWithBothEnginesAndCompareResults("{\"sql\": \"SELECT * FROM mytable\"}", + asyncResponse, request, null); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(JsonNode.class); + ArgumentCaptor sqlNodeAndOptionsCaptor = ArgumentCaptor.forClass(SqlNodeAndOptions.class); + verify(_requestHandler, times(2)).handleRequest(requestCaptor.capture(), sqlNodeAndOptionsCaptor.capture(), + any(), any(), any()); + verify(asyncResponse, times(1)).resume(any(Response.class)); + + assertEquals(requestCaptor.getAllValues().size(), 2); + assertEquals(requestCaptor.getAllValues().get(0).get("sql").asText(), "SELECT * FROM mytable"); + assertEquals(requestCaptor.getAllValues().get(1).get("sql").asText(), "SELECT * FROM mytable"); + + assertEquals(sqlNodeAndOptionsCaptor.getAllValues().size(), 2); + assertFalse(sqlNodeAndOptionsCaptor.getAllValues().get(0).getOptions() + .containsKey(CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE)); + assertEquals(sqlNodeAndOptionsCaptor.getAllValues().get(1).getOptions() + .get(CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE), "true"); + } + + @Test + public void testPinotQueryComparisonApiDifferentQuery() throws Exception { + AsyncResponse asyncResponse = mock(AsyncResponse.class); + Request request = mock(Request.class); + when(request.getRequestURL()).thenReturn(new StringBuilder()); + when(_requestHandler.handleRequest(any(), any(), any(), any(), any())) + .thenReturn(BrokerResponseNative.EMPTY_RESULT); + _pinotClientRequest.processSqlQueryWithBothEnginesAndCompareResults("{\"v1sql\": \"SELECT v1 FROM mytable\"," + + "\"v2sql\": \"SELECT v2 FROM mytable\"}", asyncResponse, request, null); + + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(JsonNode.class); + verify(_requestHandler, times(2)).handleRequest(requestCaptor.capture(), any(), any(), any(), any()); + verify(asyncResponse, times(1)).resume(any(Response.class)); + + assertEquals(requestCaptor.getAllValues().size(), 2); + assertEquals(requestCaptor.getAllValues().get(0).get("sql").asText(), "SELECT v1 FROM mytable"); + assertEquals(requestCaptor.getAllValues().get(1).get("sql").asText(), "SELECT v2 FROM mytable"); + } + + @Test + public void testPinotQueryComparisonApiMissingSql() throws Exception { + AsyncResponse asyncResponse = mock(AsyncResponse.class); + Request request = mock(Request.class); + when(request.getRequestURL()).thenReturn(new StringBuilder()); + // Both v1sql and v2sql should be present + _pinotClientRequest.processSqlQueryWithBothEnginesAndCompareResults("{\"v1sql\": \"SELECT v1 FROM mytable\"}", + asyncResponse, request, null); + + verify(_requestHandler, never()).handleRequest(any(), any(), any(), any(), any()); + verify(asyncResponse, times(1)).resume(any(Throwable.class)); + } + + @Test + public void testPinotQueryComparison() throws Exception { + // Aggregation type difference + + BrokerResponse v1BrokerResponse = new BrokerResponseNative(); + DataSchema v1DataSchema = new DataSchema(new String[]{"sum(col)"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE}); + v1BrokerResponse.setResultTable(new ResultTable(v1DataSchema, List.of(new Object[]{1234}))); + + BrokerResponse v2BrokerResponse = new BrokerResponseNative(); + DataSchema v2DataSchema = new DataSchema(new String[]{"EXPR$0"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.LONG}); + v2BrokerResponse.setResultTable(new ResultTable(v2DataSchema, List.of(new Object[]{1234}))); + + ObjectNode comparisonResponse = (ObjectNode) PinotClientRequest.getPinotQueryComparisonResponse( + "SELECT SUM(col) FROM mytable", v1BrokerResponse, v2BrokerResponse).getEntity(); + + List comparisonAnalysis = new ObjectMapper().readerFor(new TypeReference>() { }) + .readValue(comparisonResponse.get("comparisonAnalysis")); + + assertEquals(comparisonAnalysis.size(), 1); + Assert.assertTrue(comparisonAnalysis.get(0).contains("v1 type: DOUBLE, v2 type: LONG")); + + // Default limit in v1 + + v1DataSchema = new DataSchema(new String[]{"col1"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING}); + v2DataSchema = new DataSchema(new String[]{"col1"}, + new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING}); + List rows = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + rows.add(new Object[]{i}); + } + v1BrokerResponse.setResultTable(new ResultTable(v1DataSchema, new ArrayList<>(rows))); + for (int i = 10; i < 100; i++) { + rows.add(new Object[]{i}); + } + v2BrokerResponse.setResultTable(new ResultTable(v2DataSchema, new ArrayList<>(rows))); + + comparisonResponse = (ObjectNode) PinotClientRequest.getPinotQueryComparisonResponse( + "SELECT col1 FROM mytable", v1BrokerResponse, v2BrokerResponse).getEntity(); + + comparisonAnalysis = new ObjectMapper().readerFor(new TypeReference>() { }) + .readValue(comparisonResponse.get("comparisonAnalysis")); + + assertEquals(comparisonAnalysis.size(), 1); + Assert.assertTrue(comparisonAnalysis.get(0).contains("Mismatch in number of rows returned")); + } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index 8cadec6bfcc2..77595d6181fa 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -30,6 +30,10 @@ public enum ServerMeter implements AbstractMetrics.Meter { REQUEST_DESERIALIZATION_EXCEPTIONS("exceptions", true), RESPONSE_SERIALIZATION_EXCEPTIONS("exceptions", true), SCHEDULING_TIMEOUT_EXCEPTIONS("exceptions", true), + NUM_SECONDARY_QUERIES("queries", false), + NUM_SECONDARY_QUERIES_SCHEDULED("queries", false), + SERVER_OUT_OF_CAPACITY_EXCEPTIONS("exceptions", false), + QUERY_EXECUTION_EXCEPTIONS("exceptions", false), HELIX_ZOOKEEPER_RECONNECTS("reconnects", true), DELETED_SEGMENT_COUNT("segments", false), @@ -53,6 +57,9 @@ public enum ServerMeter implements AbstractMetrics.Meter { PARTIAL_UPSERT_KEYS_NOT_REPLACED("rows", false), UPSERT_OUT_OF_ORDER("rows", false), DELETED_KEYS_TTL_PRIMARY_KEYS_REMOVED("rows", false), + TOTAL_KEYS_MARKED_FOR_DELETION("rows", false), + DELETED_KEYS_WITHIN_TTL_WINDOW("rows", false), + DELETED_TTL_KEYS_IN_MULTIPLE_SEGMENTS("rows", false), METADATA_TTL_PRIMARY_KEYS_REMOVED("rows", false), UPSERT_MISSED_VALID_DOC_ID_SNAPSHOT_COUNT("segments", false), UPSERT_PRELOAD_FAILURE("count", false), diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java index b3e5e7064126..63b42440a68b 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java @@ -59,6 +59,9 @@ public enum ServerTimer implements AbstractMetrics.Timer { DEDUP_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS("milliseconds", false, "Total time taken to delete expired dedup primary keys based on metadataTTL or deletedKeysTTL"), + SECONDARY_Q_WAIT_TIME_MS("milliseconds", false, + "Time spent waiting in the secondary queue when BinaryWorkloadScheduler is used."), + // Multi-stage /** * Time spent building the hash table for the join. diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java index fe1b348a28fd..d1900f6a8f49 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java @@ -310,4 +310,8 @@ public static WindowOverFlowMode getWindowOverflowMode(Map query public static boolean isSkipUnavailableServers(Map queryOptions) { return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SKIP_UNAVAILABLE_SERVERS)); } + + public static boolean isSecondaryWorkload(Map queryOptions) { + return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.IS_SECONDARY_WORKLOAD)); + } } diff --git a/pinot-compatibility-verifier/pom.xml b/pinot-compatibility-verifier/pom.xml index 3a024f480282..38cc3374ded3 100644 --- a/pinot-compatibility-verifier/pom.xml +++ b/pinot-compatibility-verifier/pom.xml @@ -34,7 +34,6 @@ ${basedir}/.. - 2.8.2 diff --git a/pinot-connectors/pinot-spark-2-connector/pom.xml b/pinot-connectors/pinot-spark-2-connector/pom.xml index a40d20dfa879..92a9d4142cb8 100644 --- a/pinot-connectors/pinot-spark-2-connector/pom.xml +++ b/pinot-connectors/pinot-spark-2-connector/pom.xml @@ -32,9 +32,6 @@ https://pinot.apache.org/ ${basedir}/../.. - 2.4.8 - 2.3.0 - 3.2.18 org.apache.pinot.\$internal @@ -48,14 +45,18 @@ org.scala-lang.modules scala-xml_${scala.compat.version} - ${scalaxml.version} org.apache.spark spark-sql_${scala.compat.version} - ${spark.version} + ${spark2.version} provided + + + org.apache.avro + avro-mapred + log4j log4j @@ -66,6 +67,16 @@ + + org.apache.avro + avro-mapred + provided + + + org.apache.logging.log4j + log4j-slf4j-impl + provided + org.scala-lang scala-library @@ -75,7 +86,6 @@ org.scalatest scalatest_${scala.compat.version} - ${scalatest.version} test @@ -122,7 +132,6 @@ Thus, explicitly adding this plugin to a new profile to sign the files at the end all at once. --> org.apache.maven.plugins maven-gpg-plugin - 3.2.5 @@ -155,20 +164,6 @@ org.scalatest scalatest-maven-plugin - 2.2.0 - - ${project.build.directory}/surefire-reports - . - false - - - - test - - test - - - diff --git a/pinot-connectors/pinot-spark-3-connector/pom.xml b/pinot-connectors/pinot-spark-3-connector/pom.xml index 5b16c720ee87..c4ae0528b6a7 100644 --- a/pinot-connectors/pinot-spark-3-connector/pom.xml +++ b/pinot-connectors/pinot-spark-3-connector/pom.xml @@ -32,8 +32,6 @@ https://pinot.apache.org/ ${basedir}/../.. - 3.5.2 - 3.2.18 org.apache.pinot.\$internal @@ -47,7 +45,7 @@ org.apache.spark spark-sql_${scala.compat.version} - ${spark.version} + ${spark3.version} provided @@ -59,7 +57,6 @@ org.scalatest scalatest_${scala.compat.version} - ${scalatest.version} test @@ -106,7 +103,6 @@ Thus, explicitly adding this plugin to a new profile to sign the files at the end all at once. --> org.apache.maven.plugins maven-gpg-plugin - 3.2.5 @@ -139,20 +135,6 @@ org.scalatest scalatest-maven-plugin - 2.2.0 - - ${project.build.directory}/surefire-reports - . - false - - - - test - - test - - - diff --git a/pinot-connectors/pinot-spark-common/pom.xml b/pinot-connectors/pinot-spark-common/pom.xml index e69c6a0a93d4..353216f5eb0c 100644 --- a/pinot-connectors/pinot-spark-common/pom.xml +++ b/pinot-connectors/pinot-spark-common/pom.xml @@ -32,9 +32,6 @@ https://pinot.apache.org/ ${basedir}/../.. - 0.14.9 - 2.3.0 - 3.2.18 @@ -51,17 +48,14 @@ org.scala-lang.modules scala-xml_${scala.compat.version} - ${scalaxml.version} io.circe circe-parser_${scala.compat.version} - ${circe.version} io.circe circe-generic_${scala.compat.version} - ${circe.version} org.scala-lang @@ -72,22 +66,14 @@ org.scalatest scalatest_${scala.compat.version} - ${scalatest.version} test - org.xolstice.maven.plugins protobuf-maven-plugin - 0.6.1 - - com.google.protobuf:protoc:3.19.2:exe:${os.detected.classifier} - grpc-java - io.grpc:protoc-gen-grpc-java:1.44.1:exe:${os.detected.classifier} - @@ -114,7 +100,6 @@ Thus, explicitly adding this plugin to a new profile to sign the files at the end all at once. --> org.apache.maven.plugins maven-gpg-plugin - 3.2.5 @@ -147,20 +132,6 @@ org.scalatest scalatest-maven-plugin - 2.2.0 - - ${project.build.directory}/surefire-reports - . - false - - - - test - - test - - - diff --git a/pinot-controller/pom.xml b/pinot-controller/pom.xml index ef2c82378a67..466b5d9c92cf 100644 --- a/pinot-controller/pom.xml +++ b/pinot-controller/pom.xml @@ -103,7 +103,6 @@ com.github.eirslett frontend-maven-plugin - 1.15.0 diff --git a/pinot-controller/src/main/resources/package-lock.json b/pinot-controller/src/main/resources/package-lock.json index f56f0aaeff8a..dc906ee36b0d 100644 --- a/pinot-controller/src/main/resources/package-lock.json +++ b/pinot-controller/src/main/resources/package-lock.json @@ -3471,9 +3471,9 @@ "dev": true }, "node_modules/elliptic": { - "version": "6.5.4", - "resolved": "https://registry.npmjs.org/elliptic/-/elliptic-6.5.4.tgz", - "integrity": "sha512-iLhC6ULemrljPZb+QutR5TQGB+pdW6KGD5RSegS+8sorOZT+rdQFbsQFJgvN3eRqNALqJer4oQ16YvJHlU8hzQ==", + "version": "6.5.7", + "resolved": "https://registry.npmjs.org/elliptic/-/elliptic-6.5.7.tgz", + "integrity": "sha512-ESVCtTwiA+XhY3wyh24QqRGBoP3rEdDUl3EDUUo9tft074fi19IrdpH7hLCMMP3CIj7jb3W96rn8lt/BqIlt5Q==", "dev": true, "dependencies": { "bn.js": "^4.11.9", @@ -16940,9 +16940,9 @@ "dev": true }, "elliptic": { - "version": "6.5.4", - "resolved": "https://registry.npmjs.org/elliptic/-/elliptic-6.5.4.tgz", - "integrity": "sha512-iLhC6ULemrljPZb+QutR5TQGB+pdW6KGD5RSegS+8sorOZT+rdQFbsQFJgvN3eRqNALqJer4oQ16YvJHlU8hzQ==", + "version": "6.5.7", + "resolved": "https://registry.npmjs.org/elliptic/-/elliptic-6.5.7.tgz", + "integrity": "sha512-ESVCtTwiA+XhY3wyh24QqRGBoP3rEdDUl3EDUUo9tft074fi19IrdpH7hLCMMP3CIj7jb3W96rn8lt/BqIlt5Q==", "dev": true, "requires": { "bn.js": "^4.11.9", diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/BinaryWorkloadScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/BinaryWorkloadScheduler.java new file mode 100644 index 000000000000..93e84c1e6a98 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/BinaryWorkloadScheduler.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.scheduler; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableFutureTask; +import com.google.common.util.concurrent.MoreExecutors; +import java.util.List; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAccumulator; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.metrics.ServerMeter; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.metrics.ServerQueryPhase; +import org.apache.pinot.common.metrics.ServerTimer; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; +import org.apache.pinot.core.query.executor.QueryExecutor; +import org.apache.pinot.core.query.request.ServerQueryRequest; +import org.apache.pinot.core.query.scheduler.resources.BinaryWorkloadResourceManager; +import org.apache.pinot.core.query.scheduler.resources.QueryExecutorService; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This scheduler is designed to deal with two types of workloads + * 1. Primary Workloads -> regular queries from the application + * 2. Secondary Workloads -> adhoc queries fired from tools, testing, etc + * + * + * Primary Workload Queries + * Primary workloads queries are executed with priority and submitted to the Runner threads as and when they arrive. + * The resources used by a primary workload query is not capped. + * + * Secondary Workload Queries + * - Secondary workload queries are identified using a query option -> "SET isSecondaryWorkload=true" + * - Secondary workload queries are contained as follows: + * - Restrictions on number of runner threads available to process secondary queries + * - Restrictions on total number of worker threads available to process a single secondary query + * - Restrictions on total number of worker threads available to process all in-progress secondary queries + */ +public class BinaryWorkloadScheduler extends QueryScheduler { + private static final Logger LOGGER = LoggerFactory.getLogger(BinaryWorkloadScheduler.class); + + public static final String MAX_SECONDARY_QUERIES = "binarywlm.maxSecondaryRunnerThreads"; + public static final int DEFAULT_MAX_SECONDARY_QUERIES = 5; + + // Secondary Workload Runners. + private final int _numSecondaryRunners; + private final Semaphore _secondaryRunnerSemaphore; + + private final SecondaryWorkloadQueue _secondaryQueryQ; + + Thread _scheduler; + + public BinaryWorkloadScheduler(PinotConfiguration config, QueryExecutor queryExecutor, ServerMetrics metrics, + LongAccumulator latestQueryTime) { + super(config, queryExecutor, new BinaryWorkloadResourceManager(config), metrics, latestQueryTime); + + _secondaryQueryQ = new SecondaryWorkloadQueue(config, _resourceManager); + _numSecondaryRunners = config.getProperty(MAX_SECONDARY_QUERIES, DEFAULT_MAX_SECONDARY_QUERIES); + LOGGER.info("numSecondaryRunners={}", _numSecondaryRunners); + _secondaryRunnerSemaphore = new Semaphore(_numSecondaryRunners); + } + + @Override + public String name() { + return "BinaryWorkloadScheduler"; + } + + @Override + public ListenableFuture submit(ServerQueryRequest queryRequest) { + if (!_isRunning) { + return immediateErrorResponse(queryRequest, QueryException.SERVER_SCHEDULER_DOWN_ERROR); + } + + queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.SCHEDULER_WAIT); + if (!QueryOptionsUtils.isSecondaryWorkload(queryRequest.getQueryContext().getQueryOptions())) { + QueryExecutorService queryExecutorService = _resourceManager.getExecutorService(queryRequest, null); + ListenableFutureTask queryTask = createQueryFutureTask(queryRequest, queryExecutorService); + _resourceManager.getQueryRunners().submit(queryTask); + return queryTask; + } + + final SchedulerQueryContext schedQueryContext = new SchedulerQueryContext(queryRequest); + try { + // Update metrics + _serverMetrics.addMeteredTableValue(queryRequest.getTableNameWithType(), ServerMeter.NUM_SECONDARY_QUERIES, 1L); + + _secondaryQueryQ.put(schedQueryContext); + } catch (OutOfCapacityException e) { + LOGGER.error("Out of capacity for query {} table {}, message: {}", queryRequest.getRequestId(), + queryRequest.getTableNameWithType(), e.getMessage()); + return immediateErrorResponse(queryRequest, QueryException.SERVER_OUT_OF_CAPACITY_ERROR); + } catch (Exception e) { + // We should not throw any other exception other than OutOfCapacityException. Signal that there's an issue with + // the scheduler if any other exception is thrown. + LOGGER.error("Internal error for query {} table {}, message {}", queryRequest.getRequestId(), + queryRequest.getTableNameWithType(), e.getMessage()); + return immediateErrorResponse(queryRequest, QueryException.SERVER_SCHEDULER_DOWN_ERROR); + } + return schedQueryContext.getResultFuture(); + } + + @Override + public void start() { + super.start(); + _scheduler = getScheduler(); + _scheduler.setName("scheduler"); + // TODO: Considering setting a lower priority to avoid busy loop when all threads are busy processing queries. + _scheduler.setPriority(Thread.MAX_PRIORITY); + _scheduler.setDaemon(true); + _scheduler.start(); + } + + private Thread getScheduler() { + return new Thread(new Runnable() { + @Override + public void run() { + while (_isRunning) { + try { + _secondaryRunnerSemaphore.acquire(); + } catch (InterruptedException e) { + if (!_isRunning) { + LOGGER.info("Shutting down scheduler"); + } else { + LOGGER.error("Interrupt while acquiring semaphore. Exiting.", e); + } + break; + } + try { + final SchedulerQueryContext request = _secondaryQueryQ.take(); + if (request == null) { + continue; + } + ServerQueryRequest queryRequest = request.getQueryRequest(); + final QueryExecutorService executor = + _resourceManager.getExecutorService(queryRequest, request.getSchedulerGroup()); + final ListenableFutureTask queryFutureTask = createQueryFutureTask(queryRequest, executor); + queryFutureTask.addListener(new Runnable() { + @Override + public void run() { + executor.releaseWorkers(); + request.getSchedulerGroup().endQuery(); + _secondaryRunnerSemaphore.release(); + checkStopResourceManager(); + } + }, MoreExecutors.directExecutor()); + + // Update metrics + updateSecondaryWorkloadMetrics(queryRequest); + + request.setResultFuture(queryFutureTask); + request.getSchedulerGroup().startQuery(); + _resourceManager.getQueryRunners().submit(queryFutureTask); + } catch (Throwable t) { + LOGGER.error( + "Error in scheduler thread. This is indicative of a bug. Please report this. Server will continue " + + "with errors", t); + } + } + if (_isRunning) { + throw new RuntimeException("FATAL: Scheduler thread is quitting.....something went horribly wrong.....!!!"); + } else { + failAllPendingQueries(); + } + } + }); + } + + private void updateSecondaryWorkloadMetrics(ServerQueryRequest queryRequest) { + long timeInQMs = System.currentTimeMillis() - queryRequest.getTimerContext().getQueryArrivalTimeMs(); + _serverMetrics.addTimedTableValue(queryRequest.getTableNameWithType(), ServerTimer.SECONDARY_Q_WAIT_TIME_MS, + timeInQMs, TimeUnit.MILLISECONDS); + _serverMetrics.addMeteredTableValue(queryRequest.getTableNameWithType(), + ServerMeter.NUM_SECONDARY_QUERIES_SCHEDULED, 1L); + } + + @Override + public void stop() { + super.stop(); + // without this, scheduler will never stop if there are no pending queries + if (_scheduler != null) { + _scheduler.interrupt(); + } + } + + private void checkStopResourceManager() { + if (!_isRunning && _secondaryRunnerSemaphore.availablePermits() == _numSecondaryRunners) { + _resourceManager.stop(); + } + } + + synchronized private void failAllPendingQueries() { + List pending = _secondaryQueryQ.drain(); + for (SchedulerQueryContext queryContext : pending) { + queryContext.setResultFuture( + immediateErrorResponse(queryContext.getQueryRequest(), QueryException.SERVER_SCHEDULER_DOWN_ERROR)); + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java index 86c7170d93c8..69dd8bed3650 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactory.java @@ -45,6 +45,7 @@ private QuerySchedulerFactory() { public static final String FCFS_ALGORITHM = "fcfs"; public static final String TOKEN_BUCKET_ALGORITHM = "tokenbucket"; public static final String BOUNDED_FCFS_ALGORITHM = "bounded_fcfs"; + public static final String BINARY_WORKLOAD_ALGORITHM = "binary_workload"; public static final String ALGORITHM_NAME_CONFIG_KEY = "name"; public static final String DEFAULT_QUERY_SCHEDULER_ALGORITHM = FCFS_ALGORITHM; @@ -73,6 +74,9 @@ public static QueryScheduler create(PinotConfiguration schedulerConfig, QueryExe case BOUNDED_FCFS_ALGORITHM: scheduler = BoundedFCFSScheduler.create(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime); break; + case BINARY_WORKLOAD_ALGORITHM: + scheduler = new BinaryWorkloadScheduler(schedulerConfig, queryExecutor, serverMetrics, latestQueryTime); + break; default: scheduler = getQuerySchedulerByClassName(schedulerName, schedulerConfig, queryExecutor, serverMetrics, latestQueryTime); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/SecondaryWorkloadQueue.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/SecondaryWorkloadQueue.java new file mode 100644 index 000000000000..82736dbf0f2b --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/SecondaryWorkloadQueue.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.scheduler; + + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import javax.annotation.Nullable; +import org.apache.pinot.core.query.request.ServerQueryRequest; +import org.apache.pinot.core.query.scheduler.fcfs.FCFSSchedulerGroup; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Queue to maintain secondary workload queries. Used by the BinaryWorkloadScheduler. + */ +public class SecondaryWorkloadQueue { + private static final Logger LOGGER = LoggerFactory.getLogger(SecondaryWorkloadQueue.class); + private static final String SECONDARY_WORKLOAD_GROUP_NAME = "Secondary"; + + public static final String SECONDARY_QUEUE_QUERY_TIMEOUT = "binarywlm.secondaryQueueQueryTimeout"; + private static final int DEFAULT_SECONDARY_QUEUE_QUERY_TIMEOUT_SEC = 40; + + public static final String MAX_PENDING_SECONDARY_QUERIES = "binarywlm.maxPendingSecondaryQueries"; + private static final int DEFAULT_MAX_PENDING_SECONDARY_QUERIES = 20; + + public static final String QUEUE_WAKEUP_MS = "binarywlm.queueWakeupMs"; + private static final int DEFAULT_WAKEUP_MS = 1; + + private static int _wakeUpTimeMs; + private final int _maxPendingPerGroup; + + private final SchedulerGroup _schedulerGroup; + + private final Lock _queueLock = new ReentrantLock(); + private final Condition _queryReaderCondition = _queueLock.newCondition(); + private final ResourceManager _resourceManager; + private final int _queryDeadlineMs; + + public SecondaryWorkloadQueue(PinotConfiguration config, ResourceManager resourceManager) { + Preconditions.checkNotNull(config); + Preconditions.checkNotNull(resourceManager); + + _queryDeadlineMs = + config.getProperty(SECONDARY_QUEUE_QUERY_TIMEOUT, DEFAULT_SECONDARY_QUEUE_QUERY_TIMEOUT_SEC) * 1000; + _wakeUpTimeMs = config.getProperty(QUEUE_WAKEUP_MS, DEFAULT_WAKEUP_MS); + _maxPendingPerGroup = config.getProperty(MAX_PENDING_SECONDARY_QUERIES, DEFAULT_MAX_PENDING_SECONDARY_QUERIES); + LOGGER.info("queryDeadlineMs={}, wakeupTimeMs={},maxPendingPerGroup={}", _queryDeadlineMs, _wakeUpTimeMs, + _maxPendingPerGroup); + _schedulerGroup = new FCFSSchedulerGroup(SECONDARY_WORKLOAD_GROUP_NAME); + _resourceManager = resourceManager; + } + + /** + * Adds a query to the secondary workload queue. + * @param query + * @throws OutOfCapacityException + */ + public void put(SchedulerQueryContext query) + throws OutOfCapacityException { + Preconditions.checkNotNull(query); + _queueLock.lock(); + try { + checkSchedulerGroupCapacity(query); + query.setSchedulerGroupContext(_schedulerGroup); + _schedulerGroup.addLast(query); + _queryReaderCondition.signal(); + } finally { + _queueLock.unlock(); + } + } + + /** + * Blocking call to read the next query + * @return + */ + @Nullable + public SchedulerQueryContext take() { + _queueLock.lock(); + try { + while (true) { + SchedulerQueryContext schedulerQueryContext; + while ((schedulerQueryContext = takeNextInternal()) == null) { + try { + _queryReaderCondition.await(_wakeUpTimeMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + return null; + } + } + return schedulerQueryContext; + } + } finally { + _queueLock.unlock(); + } + } + + public List drain() { + List pending = new ArrayList<>(); + _queueLock.lock(); + try { + while (!_schedulerGroup.isEmpty()) { + pending.add(_schedulerGroup.removeFirst()); + } + } finally { + _queueLock.unlock(); + } + return pending; + } + + private SchedulerQueryContext takeNextInternal() { + long startTimeMs = System.currentTimeMillis(); + long deadlineEpochMillis = startTimeMs - _queryDeadlineMs; + + _schedulerGroup.trimExpired(deadlineEpochMillis); + if (_schedulerGroup.isEmpty() || !_resourceManager.canSchedule(_schedulerGroup)) { + return null; + } + + if (LOGGER.isDebugEnabled()) { + StringBuilder sb = new StringBuilder("SchedulerInfo:"); + sb.append(_schedulerGroup.toString()); + ServerQueryRequest queryRequest = _schedulerGroup.peekFirst().getQueryRequest(); + sb.append(String.format(" Group: %s: [%d,%d,%d,%d]", _schedulerGroup.name(), + queryRequest.getTimerContext().getQueryArrivalTimeMs(), queryRequest.getRequestId(), + queryRequest.getSegmentsToQuery().size(), startTimeMs)); + LOGGER.debug(sb.toString()); + } + + SchedulerQueryContext query = _schedulerGroup.removeFirst(); + return query; + } + + private void checkSchedulerGroupCapacity(SchedulerQueryContext query) + throws OutOfCapacityException { + if (_schedulerGroup.numPending() >= _maxPendingPerGroup + && _schedulerGroup.totalReservedThreads() >= _resourceManager.getTableThreadsHardLimit()) { + throw new OutOfCapacityException(String.format( + "SchedulerGroup %s is out of capacity. numPending: %d, maxPending: %d, reservedThreads: %d " + + "threadsHardLimit: %d", _schedulerGroup.name(), _schedulerGroup.numPending(), _maxPendingPerGroup, + _schedulerGroup.totalReservedThreads(), _resourceManager.getTableThreadsHardLimit())); + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java new file mode 100644 index 000000000000..fb7ffd93e3f9 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/resources/BinaryWorkloadResourceManager.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.scheduler.resources; + +import com.google.common.base.Preconditions; +import org.apache.pinot.common.utils.config.QueryOptionsUtils; +import org.apache.pinot.core.query.request.ServerQueryRequest; +import org.apache.pinot.core.query.scheduler.SchedulerGroupAccountant; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * ResourceManager for BinaryWorkloadScheduler. + */ +public class BinaryWorkloadResourceManager extends ResourceManager { + private static final Logger LOGGER = LoggerFactory.getLogger(BinaryWorkloadResourceManager.class); + private final ResourceLimitPolicy _secondaryWorkloadPolicy; + + public BinaryWorkloadResourceManager(PinotConfiguration config) { + super(config); + _secondaryWorkloadPolicy = new ResourceLimitPolicy(config, _numQueryWorkerThreads); + } + + /** + * Returns an executor service that query executor can use like a dedicated + * service for submitting jobs for parallel execution. + * @param query + * @param accountant Accountant for a scheduler group + * @return UnboundedExecutorService for primary workload queries. For secondary workload queries, returns a + * BoundedAccountingExecutor service that limits the number of threads available for query execution. Query + * execution can submit tasks for parallel execution without need + * for limiting their parallelism. + */ + @Override + public QueryExecutorService getExecutorService(ServerQueryRequest query, SchedulerGroupAccountant accountant) { + if (!QueryOptionsUtils.isSecondaryWorkload(query.getQueryContext().getQueryOptions())) { + return getPrimaryWorkloadExecutorService(); + } + + return getSecondaryWorkloadExecutorService(query, accountant); + } + + @Override + public int getTableThreadsHardLimit() { + return _secondaryWorkloadPolicy.getTableThreadsHardLimit(); + } + + @Override + public int getTableThreadsSoftLimit() { + return _secondaryWorkloadPolicy.getTableThreadsSoftLimit(); + } + + private QueryExecutorService getPrimaryWorkloadExecutorService() { + return new QueryExecutorService() { + @Override + public void execute(Runnable command) { + _queryWorkers.submit(command); + } + }; + } + + private QueryExecutorService getSecondaryWorkloadExecutorService(ServerQueryRequest query, + SchedulerGroupAccountant accountant) { + int numSegments = query.getSegmentsToQuery().size(); + int queryThreadLimit = Math.max(1, Math.min(_secondaryWorkloadPolicy.getMaxThreadsPerQuery(), numSegments)); + int spareThreads = _secondaryWorkloadPolicy.getTableThreadsHardLimit() - accountant.totalReservedThreads(); + if (spareThreads <= 0) { + LOGGER.warn("UNEXPECTED: Attempt to schedule query uses more than the configured hard limit on threads"); + spareThreads = 1; + } else { + spareThreads = Math.min(spareThreads, queryThreadLimit); + } + Preconditions.checkState(spareThreads >= 1); + // We do not bound number of threads here by total available threads. We can potentially + // over-provision number of threads here. That is intentional and (potentially) good solution. + // Queries don't use their workers all the time. So, reserving workers leads to suboptimal resource + // utilization. We want to keep the pipe as full as possible for query workers. Overprovisioning is one + // way to achieve that (in fact, only way for us). There is a couter-argument to be made that overprovisioning + // can impact cache-lines and memory in general. + // We use this thread reservation only to determine priority based on resource utilization and not as a way to + // improve system performance (because we don't have good insight on that yet) + accountant.addReservedThreads(spareThreads); + // TODO: For 1 thread we should have the query run in the same queryRunner thread + // by supplying an executor service that similar to Guava' directExecutor() + return new BoundedAccountingExecutor(_queryWorkers, spareThreads, accountant); + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java index a495c6c5d768..9c4c59ea1efc 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerFactoryTest.java @@ -55,6 +55,11 @@ public void testQuerySchedulerFactory() { queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime); assertTrue(queryScheduler instanceof BoundedFCFSScheduler); + config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY, + QuerySchedulerFactory.BINARY_WORKLOAD_ALGORITHM); + queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime); + assertTrue(queryScheduler instanceof BinaryWorkloadScheduler); + config.setProperty(QuerySchedulerFactory.ALGORITHM_NAME_CONFIG_KEY, TestQueryScheduler.class.getName()); queryScheduler = QuerySchedulerFactory.create(config, queryExecutor, serverMetrics, latestQueryTime); assertTrue(queryScheduler instanceof TestQueryScheduler); diff --git a/pinot-distribution/pom.xml b/pinot-distribution/pom.xml index 9beb15454052..f84aa8ef27ba 100644 --- a/pinot-distribution/pom.xml +++ b/pinot-distribution/pom.xml @@ -50,7 +50,6 @@ net.nicoulaj.maven.plugins checksum-maven-plugin - 1.11 @@ -132,7 +131,6 @@ org.codehaus.mojo exec-maven-plugin - 3.4.1 remove-build-directory diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pom.xml b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pom.xml index 7dfbb2a643e1..f8f7c0763918 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pom.xml +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-2.4/pom.xml @@ -34,9 +34,6 @@ ${basedir}/../../.. package - 2.11 - 2.4.6 - 2.11.12 @@ -46,25 +43,14 @@ org.apache.spark - spark-core_${scala.major.version} - ${spark.version} + spark-core_${scala.compat.version} + ${spark2.version} provided + - com.zaxxer - HikariCP-java7 - - - com.twitter - chill_2.11 - - - com.twitter - chill-java - - - org.apache.curator - curator-recipes + org.apache.avro + avro-mapred log4j @@ -76,10 +62,19 @@ + + org.apache.avro + avro-mapred + provided + + + org.apache.logging.log4j + log4j-slf4j-impl + provided + org.scala-lang scala-library - ${scala.minor.version} provided @@ -93,13 +88,11 @@ com.esotericsoftware.kryo kryo - 2.24.0 test com.twitter chill_2.11 - 0.10.0 test diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/pom.xml b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/pom.xml index 30c463d563c9..f19057a87e0e 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/pom.xml +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/pom.xml @@ -34,7 +34,6 @@ ${basedir}/../../.. package - 3.5.2 @@ -45,24 +44,12 @@ org.apache.spark spark-core_${scala.compat.version} - ${spark.version} + ${spark3.version} provided - com.zaxxer - HikariCP-java7 - - - com.twitter - chill_2.11 - - - com.twitter - chill-java - - - org.apache.curator - curator-recipes + commons-logging + commons-logging log4j @@ -72,16 +59,11 @@ org.slf4j slf4j-log4j12 - - commons-logging - commons-logging - org.scala-lang scala-library - ${scala.version} provided diff --git a/pinot-plugins/pinot-file-system/pinot-hdfs/pom.xml b/pinot-plugins/pinot-file-system/pinot-hdfs/pom.xml index 6986ae37e09c..32074000b5a3 100644 --- a/pinot-plugins/pinot-file-system/pinot-hdfs/pom.xml +++ b/pinot-plugins/pinot-file-system/pinot-hdfs/pom.xml @@ -39,6 +39,11 @@ org.apache.hadoop hadoop-common + + + org.bouncycastle + bcprov-jdk18on + org.codehaus.woodstox stax2-api diff --git a/pinot-plugins/pinot-input-format/pinot-confluent-avro/pom.xml b/pinot-plugins/pinot-input-format/pinot-confluent-avro/pom.xml index 642d84cf2f6d..5ea6510f58af 100644 --- a/pinot-plugins/pinot-input-format/pinot-confluent-avro/pom.xml +++ b/pinot-plugins/pinot-input-format/pinot-confluent-avro/pom.xml @@ -33,7 +33,6 @@ https://pinot.apache.org/ ${basedir}/../../.. - 2.8.2 package @@ -50,23 +49,14 @@ org.apache.kafka kafka-clients - ${kafka.lib.version} io.confluent kafka-schema-registry-client - ${confluent.version} - - - org.apache.kafka - kafka-clients - - io.confluent kafka-avro-serializer - ${confluent.version} diff --git a/pinot-plugins/pinot-input-format/pinot-orc/pom.xml b/pinot-plugins/pinot-input-format/pinot-orc/pom.xml index 658d07f00a66..1936710fee8e 100644 --- a/pinot-plugins/pinot-input-format/pinot-orc/pom.xml +++ b/pinot-plugins/pinot-input-format/pinot-orc/pom.xml @@ -41,6 +41,11 @@ hadoop-common ${hadoop.dependencies.scope} + + + org.bouncycastle + bcprov-jdk18on + org.apache.hadoop hadoop-hdfs diff --git a/pinot-plugins/pinot-input-format/pinot-parquet/pom.xml b/pinot-plugins/pinot-input-format/pinot-parquet/pom.xml index 1ec75e569c1c..aed20f33abe1 100644 --- a/pinot-plugins/pinot-input-format/pinot-parquet/pom.xml +++ b/pinot-plugins/pinot-input-format/pinot-parquet/pom.xml @@ -49,6 +49,10 @@ hadoop-common ${hadoop.dependencies.scope} + + org.bouncycastle + bcprov-jdk18on + org.apache.hadoop hadoop-client-runtime diff --git a/pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml b/pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml index 14b0ab5a1174..44a97a74582b 100644 --- a/pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml +++ b/pinot-plugins/pinot-input-format/pinot-protobuf/pom.xml @@ -34,7 +34,6 @@ https://pinot.apache.org/ ${basedir}/../../.. - 2.8.2 package @@ -60,23 +59,14 @@ org.apache.kafka kafka-clients - ${kafka.lib.version} io.confluent kafka-schema-registry-client - ${confluent.version} - - - org.apache.kafka - kafka-clients - - io.confluent kafka-protobuf-serializer - ${confluent.version} @@ -95,9 +85,7 @@ org.xolstice.maven.plugins protobuf-maven-plugin - 0.6.1 - com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} ${basedir}/src/test/resources diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml index 0d4b974ad5e5..f2c9391078e3 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/pom.xml @@ -33,7 +33,6 @@ https://pinot.apache.org/ ${basedir}/../../.. - 2.8.2 package @@ -46,12 +45,10 @@ org.apache.kafka kafka-clients - ${kafka.lib.version} org.apache.kafka kafka_${scala.compat.version} - ${kafka.lib.version} com.fasterxml.jackson.module diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml index 24b4a50965ed..7bae0df3e86d 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml @@ -33,22 +33,9 @@ https://pinot.apache.org/ ${basedir}/../../.. - 1.0.2 0.2.23 - - - - software.amazon.awssdk - bom - ${aws.sdk.version} - pom - import - - - - software.amazon.awssdk diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml index 700110e4dc90..b07237a8df5e 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/pom.xml @@ -41,13 +41,11 @@ org.apache.pulsar pulsar-client - ${pulsar.version} org.apache.pulsar pulsar-client-admin - ${pulsar.version} test diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index ba24f2c91226..b1fc92a2bf29 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -189,6 +189,8 @@ protected void removeSegment(IndexSegment segment, Iterator primaryK public void doRemoveExpiredPrimaryKeys() { AtomicInteger numMetadataTTLKeysRemoved = new AtomicInteger(); AtomicInteger numDeletedTTLKeysRemoved = new AtomicInteger(); + AtomicInteger numTotalKeysMarkForDeletion = new AtomicInteger(); + AtomicInteger numDeletedKeysWithinTTLWindow = new AtomicInteger(); double largestSeenComparisonValue = _largestSeenComparisonValue.get(); double metadataTTLKeysThreshold; if (_metadataTTL > 0) { @@ -208,13 +210,20 @@ public void doRemoveExpiredPrimaryKeys() { if (_metadataTTL > 0 && comparisonValue < metadataTTLKeysThreshold) { _primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation); numMetadataTTLKeysRemoved.getAndIncrement(); - } else if (_deletedKeysTTL > 0 && comparisonValue < deletedKeysThreshold) { + } else if (_deletedKeysTTL > 0) { ThreadSafeMutableRoaringBitmap currentQueryableDocIds = recordLocation.getSegment().getQueryableDocIds(); // if key not part of queryable doc id, it means it is deleted if (currentQueryableDocIds != null && !currentQueryableDocIds.contains(recordLocation.getDocId())) { - _primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation); - removeDocId(recordLocation.getSegment(), recordLocation.getDocId()); - numDeletedTTLKeysRemoved.getAndIncrement(); + numTotalKeysMarkForDeletion.getAndIncrement(); + if (comparisonValue >= deletedKeysThreshold) { + // If key is within the TTL window, do not remove it from the primary hashmap + numDeletedKeysWithinTTLWindow.getAndIncrement(); + } else { + // delete key from primary hashmap + _primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation); + removeDocId(recordLocation.getSegment(), recordLocation.getDocId()); + numDeletedTTLKeysRemoved.getAndIncrement(); + } } } }); @@ -236,6 +245,16 @@ public void doRemoveExpiredPrimaryKeys() { _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETED_KEYS_TTL_PRIMARY_KEYS_REMOVED, numDeletedTTLKeys); } + int numTotalKeysMarkedForDeletion = numTotalKeysMarkForDeletion.get(); + if (numTotalKeysMarkedForDeletion > 0) { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.TOTAL_KEYS_MARKED_FOR_DELETION, + numTotalKeysMarkedForDeletion); + } + int numDeletedKeysWithinTTLWindowValue = numDeletedKeysWithinTTLWindow.get(); + if (numDeletedKeysWithinTTLWindowValue > 0) { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETED_KEYS_WITHIN_TTL_WINDOW, + numDeletedKeysWithinTTLWindowValue); + } } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java index b49d09e04bc1..42218090394c 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java @@ -234,7 +234,10 @@ protected void removeSegment(IndexSegment segment, Iterator primaryK @Override public void doRemoveExpiredPrimaryKeys() { + AtomicInteger numTotalKeysMarkForDeletion = new AtomicInteger(); AtomicInteger numDeletedTTLKeysRemoved = new AtomicInteger(); + AtomicInteger numDeletedKeysWithinTTLWindow = new AtomicInteger(); + AtomicInteger numDeletedTTLKeysInMultipleSegments = new AtomicInteger(); double largestSeenComparisonValue = _largestSeenComparisonValue.get(); double deletedKeysThreshold; if (_deletedKeysTTL > 0) { @@ -249,20 +252,44 @@ public void doRemoveExpiredPrimaryKeys() { // an issue can arise where the upsert compaction might first process the segment containing the delete record // while the previous segment(s) are not compacted. Upon restart, this can inadvertently revive the key // that was originally marked for deletion. - if (_deletedKeysTTL > 0 && comparisonValue < deletedKeysThreshold - && recordLocation.getDistinctSegmentCount() <= 1) { + if (_deletedKeysTTL > 0) { ThreadSafeMutableRoaringBitmap currentQueryableDocIds = recordLocation.getSegment().getQueryableDocIds(); // if key not part of queryable doc id, it means it is deleted if (currentQueryableDocIds != null && !currentQueryableDocIds.contains(recordLocation.getDocId())) { - _primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation); - removeDocId(recordLocation.getSegment(), recordLocation.getDocId()); - numDeletedTTLKeysRemoved.getAndIncrement(); + numTotalKeysMarkForDeletion.getAndIncrement(); + if (comparisonValue >= deletedKeysThreshold) { + // If key is within the TTL window, do not remove it from the primary hashmap + numDeletedKeysWithinTTLWindow.getAndIncrement(); + } else if (recordLocation.getDistinctSegmentCount() > 1) { + // If key is part of multiple segments, do not remove it from the primary hashmap + numDeletedTTLKeysInMultipleSegments.getAndIncrement(); + } else { + // delete key from primary hashmap + _primaryKeyToRecordLocationMap.remove(primaryKey, recordLocation); + removeDocId(recordLocation.getSegment(), recordLocation.getDocId()); + numDeletedTTLKeysRemoved.getAndIncrement(); + } } } }); // Update metrics updatePrimaryKeyGauge(); + int numTotalKeysMarkedForDeletion = numTotalKeysMarkForDeletion.get(); + if (numTotalKeysMarkedForDeletion > 0) { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.TOTAL_KEYS_MARKED_FOR_DELETION, + numTotalKeysMarkedForDeletion); + } + int numDeletedKeysWithinTTLWindowValue = numDeletedKeysWithinTTLWindow.get(); + if (numDeletedKeysWithinTTLWindowValue > 0) { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETED_KEYS_WITHIN_TTL_WINDOW, + numDeletedKeysWithinTTLWindowValue); + } + int numDeletedTTLKeysInMultipleSegmentsValue = numDeletedTTLKeysInMultipleSegments.get(); + if (numDeletedTTLKeysInMultipleSegmentsValue > 0) { + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.DELETED_TTL_KEYS_IN_MULTIPLE_SEGMENTS, + numDeletedTTLKeysInMultipleSegmentsValue); + } int numDeletedTTLKeys = numDeletedTTLKeysRemoved.get(); if (numDeletedTTLKeys > 0) { _logger.info("Deleted {} primary keys based on deletedKeysTTL", numDeletedTTLKeys); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java index 7280b86833d0..882715d7876f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java @@ -49,8 +49,8 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta public BasePartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId) { return _partitionMetadataManagerMap.computeIfAbsent(partitionId, k -> _enableDeletedKeysCompactionConsistency - ? new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _context) - : new ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(_tableNameWithType, k, _context)); + ? new ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(_tableNameWithType, k, _context) + : new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _context)); } @Override diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithMinMaxTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithMinMaxTest.java index be2d64951bf7..1d6900a0c029 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithMinMaxTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithMinMaxTest.java @@ -28,6 +28,7 @@ import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; @@ -42,6 +43,7 @@ */ public class SegmentGenerationWithMinMaxTest { private static final String STRING_COLUMN = "col1"; + private static final String[] STRING_VALUES_WITH_COMMA_CHARACTER = {"A,,", ",B,", "C,Z,", "D,", "E,"}; private static final String[] STRING_VALUES_WITH_WHITESPACE_CHARACTERS = {"A ", " B ", " Z ", " \r D", "E"}; private static final String[] STRING_VALUES_VALID = {"A", "B", "C", "D", "E"}; @@ -63,6 +65,39 @@ public void setup() { .addMetric(LONG_COLUMN, FieldSpec.DataType.LONG).build(); } + @Test + public void testMinMaxlength() throws Exception { + //Test1: Min String length is greater than default 512 + DimensionFieldSpec d = new DimensionFieldSpec(STRING_COLUMN, FieldSpec.DataType.STRING, true, 15000, "null"); + _schema = new Schema.SchemaBuilder().addField(d) + .addMetric(LONG_COLUMN, FieldSpec.DataType.LONG).build(); + + FileUtils.deleteQuietly(new File(SEGMENT_DIR_NAME)); + //minLong stRING + String longString = generateLongString(15000, true); + String[] longValues = {"{}", "dd", "ff", "ee", longString}; + File segmentDir = buildSegment(_tableConfig, _schema, longValues); + SegmentMetadataImpl metadata = new SegmentMetadataImpl(segmentDir); + Assert.assertEquals(metadata.getTotalDocs(), 5); + Assert.assertFalse(metadata.getColumnMetadataFor("col1").isMinMaxValueInvalid()); + Assert.assertNull(metadata.getColumnMetadataFor("col1").getMinValue()); + Assert.assertEquals(metadata.getColumnMetadataFor("col1").getMaxValue(), "{}"); + + FileUtils.deleteQuietly(new File(SEGMENT_DIR_NAME)); + + //maxLong String + longString = generateLongString(15000, false); + longValues = new String[]{"aa", "dd", "ff", "ee", longString}; + segmentDir = buildSegment(_tableConfig, _schema, longValues); + metadata = new SegmentMetadataImpl(segmentDir); + Assert.assertEquals(metadata.getTotalDocs(), 5); + Assert.assertFalse(metadata.getColumnMetadataFor("col1").isMinMaxValueInvalid()); + Assert.assertEquals(metadata.getColumnMetadataFor("col1").getMinValue(), "aa"); + Assert.assertNull(metadata.getColumnMetadataFor("col1").getMaxValue()); + + FileUtils.deleteQuietly(new File(SEGMENT_DIR_NAME)); + } + @Test public void testMinMaxInMetadata() throws Exception { @@ -111,4 +146,13 @@ private File buildSegment(final TableConfig tableConfig, final Schema schema, St driver.getOutputDirectory().deleteOnExit(); return driver.getOutputDirectory(); } + + private String generateLongString(int length, boolean isMin) { + char repeatingChar = isMin ? 'a' : 'z'; + StringBuilder sb = new StringBuilder(length); + for (int i = 0; i < length; i++) { + sb.append(repeatingChar); + } + return sb.toString(); + } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java index 6e46127ff334..a02efd77d44e 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java @@ -3008,4 +3008,39 @@ public void testForwardIndexEnabledWithRawOnExistingForwardIndexDisabledColumn() _indexLoadingConfig.removeNoDictionaryColumns(EXISTING_FORWARD_INDEX_DISABLED_COL_SV, EXISTING_FORWARD_INDEX_DISABLED_COL_MV, EXISTING_STRING_COL_DICT); } + + @Test + public void testNeedAddMinMaxLength() + throws Exception { + + String longString = generateLongString(15000, true); + String[] stringValuesValid = {"B", "C", "D", "E", longString}; + long[] longValues = {1588316400000L, 1588489200000L, 1588662000000L, 1588834800000L, 1589007600000L}; + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build(); + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("stringCol", FieldSpec.DataType.STRING) + .addMetric("longCol", FieldSpec.DataType.LONG).build(); + + FileUtils.deleteQuietly(INDEX_DIR); + + // build good segment, no needPreprocess + File segment = buildTestSegment(tableConfig, schema, "validSegment", stringValuesValid, longValues); + SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader() + .load(segment.toURI(), + new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build()); + IndexLoadingConfig indexLoadingConfig = getDefaultIndexLoadingConfig(); + indexLoadingConfig.setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode.ALL); + SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, indexLoadingConfig, schema); + assertFalse(processor.needProcess()); + + FileUtils.deleteQuietly(INDEX_DIR); + } + + private String generateLongString(int length, boolean isMin) { + char repeatingChar = isMin ? 'a' : 'z'; + StringBuilder sb = new StringBuilder(length); + for (int i = 0; i < length; i++) { + sb.append(repeatingChar); + } + return sb.toString(); + } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactoryTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactoryTest.java index daf10826c95c..08f9796ff649 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactoryTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactoryTest.java @@ -18,13 +18,20 @@ */ package org.apache.pinot.segment.local.upsert; +import com.google.common.collect.Lists; +import java.io.File; +import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.annotations.Test; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -37,12 +44,21 @@ public class TableUpsertMetadataManagerFactoryTest { public void testCreateForDefaultManagerClass() { UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL); upsertConfig.setHashFunction(HashFunction.NONE); + Schema schema = + new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME) + .addSingleValueDimension("myCol", FieldSpec.DataType.STRING) + .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build(); + TableDataManager tableDataManager = mock(TableDataManager.class); + when(tableDataManager.getTableDataDir()).thenReturn(new File(RAW_TABLE_NAME)); _tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setUpsertConfig(upsertConfig).build(); TableUpsertMetadataManager tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(_tableConfig, null); assertNotNull(tableUpsertMetadataManager); assertTrue(tableUpsertMetadataManager instanceof ConcurrentMapTableUpsertMetadataManager); + tableUpsertMetadataManager.init(_tableConfig, schema, tableDataManager); + assertTrue(tableUpsertMetadataManager.getOrCreatePartitionManager(0) + instanceof ConcurrentMapPartitionUpsertMetadataManager); } @Test @@ -50,11 +66,20 @@ public void testCreateForManagerClassWithConsistentDeletes() { UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL); upsertConfig.setHashFunction(HashFunction.NONE); upsertConfig.setEnableDeletedKeysCompactionConsistency(true); + Schema schema = + new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME) + .addSingleValueDimension("myCol", FieldSpec.DataType.STRING) + .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build(); + TableDataManager tableDataManager = mock(TableDataManager.class); + when(tableDataManager.getTableDataDir()).thenReturn(new File(RAW_TABLE_NAME)); _tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setUpsertConfig(upsertConfig).build(); TableUpsertMetadataManager tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(_tableConfig, null); assertNotNull(tableUpsertMetadataManager); - assertTrue(tableUpsertMetadataManager instanceof BaseTableUpsertMetadataManager); + assertTrue(tableUpsertMetadataManager instanceof ConcurrentMapTableUpsertMetadataManager); + tableUpsertMetadataManager.init(_tableConfig, schema, tableDataManager); + assertTrue(tableUpsertMetadataManager.getOrCreatePartitionManager(0) + instanceof ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes); } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java index c599ae707e58..fdd1f5c290e3 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java @@ -264,39 +264,13 @@ public static ColumnMetadataImpl fromPropertiesConfiguration(String column, Prop // TODO: Use getProperty() for other properties as well to avoid the overhead of variable substitution String minString = (String) config.getProperty(Column.getKeyFor(column, Column.MIN_VALUE)); String maxString = (String) config.getProperty(Column.getKeyFor(column, Column.MAX_VALUE)); - if (minString != null && maxString != null) { - switch (storedType) { - case INT: - builder.setMinValue(Integer.valueOf(minString)); - builder.setMaxValue(Integer.valueOf(maxString)); - break; - case LONG: - builder.setMinValue(Long.valueOf(minString)); - builder.setMaxValue(Long.valueOf(maxString)); - break; - case FLOAT: - builder.setMinValue(Float.valueOf(minString)); - builder.setMaxValue(Float.valueOf(maxString)); - break; - case DOUBLE: - builder.setMinValue(Double.valueOf(minString)); - builder.setMaxValue(Double.valueOf(maxString)); - break; - case BIG_DECIMAL: - builder.setMinValue(new BigDecimal(minString)); - builder.setMaxValue(new BigDecimal(maxString)); - break; - case STRING: - builder.setMinValue(CommonsConfigurationUtils.recoverSpecialCharacterInPropertyValue(minString)); - builder.setMaxValue(CommonsConfigurationUtils.recoverSpecialCharacterInPropertyValue(maxString)); - break; - case BYTES: - builder.setMinValue(BytesUtils.toByteArray(minString)); - builder.setMaxValue(BytesUtils.toByteArray(maxString)); - break; - default: - throw new IllegalStateException("Unsupported data type: " + dataType + " for column: " + column); - } + // Set min/max value if available + if (minString != null) { + builder.setMinValue(builder.parseValue(storedType, column, minString)); + } + + if (maxString != null) { + builder.setMaxValue(builder.parseValue(storedType, column, maxString)); } builder.setMinMaxValueInvalid(config.getBoolean(Column.getKeyFor(column, Column.MIN_MAX_VALUE_INVALID), false)); @@ -443,5 +417,26 @@ public ColumnMetadataImpl build() { _minMaxValueInvalid, _hasDictionary, _columnMaxLength, _bitsPerElement, _maxNumberOfMultiValues, _totalNumberOfEntries, _partitionFunction, _partitions, _indexSizeMap, _autoGenerated); } + + private Comparable parseValue(DataType storedType, String column, String valueString) { + switch (storedType) { + case INT: + return Integer.valueOf(valueString); + case LONG: + return Long.valueOf(valueString); + case FLOAT: + return Float.valueOf(valueString); + case DOUBLE: + return Double.valueOf(valueString); + case BIG_DECIMAL: + return new BigDecimal(valueString); + case STRING: + return CommonsConfigurationUtils.recoverSpecialCharacterInPropertyValue(valueString); + case BYTES: + return BytesUtils.toByteArray(valueString); + default: + throw new IllegalStateException("Unsupported data type: " + storedType + " for column: " + column); + } + } } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java index 3c87f5407f81..8d54f212595a 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/plugin/PluginManager.java @@ -81,8 +81,6 @@ public class PluginManager { put("org.apache.pinot.filesystem.LocalPinotFS", "org.apache.pinot.spi.filesystem.LocalPinotFS"); // StreamConsumerFactory - put("org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory", - "org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory"); put("org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory", "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory"); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataServerStartable.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataServerStartable.java index 2cefe507f154..f1ce97cbdf7d 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataServerStartable.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataServerStartable.java @@ -25,7 +25,7 @@ * StreamDataServerStartable is the interface for stream data sources. * Each stream data connector should implement a mock/wrapper of the data server. * - * E.g. KafkaDataServerStartable is a wrapper class of Kafka 0.9 broker. + * E.g. KafkaDataServerStartable is a wrapper class of Kafka 2.x broker. * */ public interface StreamDataServerStartable { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 2bfc61e7f42f..09b31f0fa42f 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -359,6 +359,8 @@ public static class Broker { public static class Request { public static final String SQL = "sql"; + public static final String V1SQL = "v1sql"; + public static final String V2SQL = "v2sql"; public static final String TRACE = "trace"; public static final String QUERY_OPTIONS = "queryOptions"; @@ -428,6 +430,14 @@ public static class QueryOptionKey { // If query submission causes an exception, still continue to submit the query to other servers public static final String SKIP_UNAVAILABLE_SERVERS = "skipUnavailableServers"; + + // Indicates that a query belongs to a secondary workload when using the BinaryWorkloadScheduler. The + // BinaryWorkloadScheduler divides queries into two workloads, primary and secondary. Primary workloads are + // executed in an Unbounded FCFS fashion. However, secondary workloads are executed in a constrainted FCFS + // fashion with limited compute.des queries into two workloads, primary and secondary. Primary workloads are + // executed in an Unbounded FCFS fashion. However, secondary workloads are executed in a constrainted FCFS + // fashion with limited compute. + public static final String IS_SECONDARY_WORKLOAD = "isSecondaryWorkload"; } public static class QueryOptionValue { diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java index fce6d245be68..e86d91a26731 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java @@ -190,9 +190,6 @@ public void testBackwardCompatible() { "org.apache.pinot.spi.filesystem.LocalPinotFS"); // StreamConsumerFactory - Assert.assertEquals(PluginManager - .loadClassWithBackwardCompatibleCheck("org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory"), - "org.apache.pinot.plugin.stream.kafka09.KafkaConsumerFactory"); Assert.assertEquals(PluginManager .loadClassWithBackwardCompatibleCheck("org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory"), "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory"); diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml index ec41e8823227..0186d9a4b277 100644 --- a/pinot-tools/pom.xml +++ b/pinot-tools/pom.xml @@ -32,7 +32,6 @@ https://pinot.apache.org/ ${basedir}/.. - 3.5.2 @@ -127,6 +126,11 @@ hadoop-common compile + + + org.bouncycastle + bcprov-jdk18on + org.apache.hadoop hadoop-hdfs @@ -135,7 +139,6 @@ xml-apis xml-apis - compile software.amazon.awssdk @@ -167,7 +170,7 @@ org.apache.spark spark-launcher_${scala.compat.version} - ${spark.version} + ${spark3.version} @@ -452,8 +455,8 @@ + org.apache.maven.plugins maven-resources-plugin - 3.3.1 copy-kafka-resources diff --git a/pom.xml b/pom.xml index 74debe83bfd5..2cb42573814d 100644 --- a/pom.xml +++ b/pom.xml @@ -126,8 +126,6 @@ -Xms4g -Xmx4g true - 3.4.0 - 3.5.0 warning @@ -135,8 +133,6 @@ org.apache.pinot.shaded - 3.4.2 - 3.6.0 none 1.11.3 @@ -150,7 +146,6 @@ 3.0.0 2.42 2.6.1 - 3.30.2-GA 1.6.14 5.17.14 3.4.0 @@ -179,15 +174,19 @@ 0.15.0 0.4.5 4.2.2 - 2.27.9 + 2.27.13 1.2.26 - 1.16.2 + 1.17.0 2.12.7 3.1.12 - 7.0.0 8.3.6 0.4 + 2.3.0 + 0.14.9 + 2.4.8 + 3.5.2 + 2.8.2 7.7.0 3.3.1 1.20.0 @@ -196,7 +195,7 @@ 3.16.0 4.4 1.12.0 - 1.27.0 + 1.27.1 3.6.1 1.11.0 2.11.0 @@ -241,26 +240,43 @@ 2.12.19 2.12 - - 24.1.0 - 2.0.10 - 3.9.0 + 2.1.0 3.26.3 2.0.0 1.5.4 - 9.4.55.v20240627 9.40 + 3.6.1 + 9.4.55.v20240627 + 7.0.0 + 5.7.0 + 3.30.2-GA 1.78.1 0.27 5.14.0 + 2.2.16 + 0.10.4 + 9.7 2.8 + 2.0.20 + 24.1.0 + 3.9.0 + 2.24.0 + 3.4 + 0.10.0 + 2.4.13 + 2.5.2 + 0.10.1 + 0.3.1 7.10.2 5.12.0 - 3.16.1 + 3.16.2 1.20.1 + 2.3.232 + 3.1.19 + 3.2.19 @@ -772,6 +788,16 @@ parquet-avro ${parquet.version} + + org.apache.parquet + parquet-column + ${parquet.version} + + + org.apache.parquet + parquet-hadoop + ${parquet.version} + org.apache.orc orc-core @@ -816,12 +842,6 @@ org.apache.helix helix-core ${helix.version} - - - org.apache.logging.log4j - log4j-slf4j-impl - - org.openjdk.jmh @@ -831,21 +851,6 @@ - - org.slf4j - jcl-over-slf4j - ${slf4j.version} - - - org.slf4j - jul-to-slf4j - ${slf4j.version} - - - org.slf4j - slf4j-api - ${slf4j.version} - org.apache.logging.log4j log4j-api @@ -866,6 +871,27 @@ log4j-1.2-api ${log4j.version} + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + + + org.slf4j + slf4j-api + ${slf4j.version} + + + org.slf4j + jcl-over-slf4j + ${slf4j.version} + + + org.slf4j + jul-to-slf4j + ${slf4j.version} + com.lmax disruptor @@ -887,11 +913,20 @@ larray-mmap 0.4.1 - org.apache.zookeeper zookeeper ${zookeeper.version} + + + ch.qos.logback + logback-core + + + ch.qos.logback + logback-classic + + @@ -1118,6 +1153,10 @@ ${hadoop.version} provided + + org.bouncycastle + bcprov-jdk15on + com.sun.jersey jersey-core @@ -1245,69 +1284,6 @@ hadoop-shaded-protobuf_3_21 1.2.0 - - - org.apache.kerby - kerb-core - ${kerby.version} - - - org.apache.kerby - kerb-simplekdc - ${kerby.version} - - - org.jline - jline - ${jline.version} - - - org.wildfly.common - wildfly-common - ${wildfly.version} - - - org.codehaus.jettison - jettison - ${jettison.version} - - - com.nimbusds - nimbus-jose-jwt - ${nimbus-jose-jwt.version} - - - - - org.eclipse.jetty.websocket - websocket-client - ${eclipse.jetty.version} - - - org.eclipse.jetty - jetty-server - ${eclipse.jetty.version} - - - org.eclipse.jetty - jetty-servlet - ${eclipse.jetty.version} - - - org.eclipse.jetty - jetty-util - ${eclipse.jetty.version} - - - org.eclipse.jetty - jetty-util-ajax - ${eclipse.jetty.version} - - - org.eclipse.jetty - jetty-webapp - ${eclipse.jetty.version} - @@ -1469,12 +1445,6 @@ hk2-metadata-generator ${hk2.version} - - - org.javassist - javassist - ${javassist.version} - io.swagger swagger-jersey2-jaxrs @@ -1492,36 +1462,6 @@ ${swagger-ui.version} - - org.apache.maven.surefire - surefire-testng - ${surefire.version} - - - org.jetbrains - annotations - 24.1.0 - - - com.h2database - h2 - 2.3.232 - - - com.github.jnr - jnr-posix - 3.1.19 - - - com.github.jnr - jnr-ffi - 2.2.16 - - - com.github.jnr - jnr-constants - 0.10.4 - info.picocli picocli @@ -1532,7 +1472,6 @@ tyrus-standalone-client 2.2.0 - net.sf.jopt-simple jopt-simple @@ -1557,7 +1496,7 @@ com.github.seancfoley ipaddress - 5.5.0 + 5.5.1 @@ -1570,37 +1509,41 @@ chronicle-core 2.26ea1 + - org.ow2.asm - asm - 9.7 + com.yscope.clp + clp-ffi + ${clp-ffi.version} - net.java.dev.jna - jna-platform - ${jna.version} + org.codehaus.woodstox + stax2-api + ${stax2-api.version} - net.java.dev.jna - jna - ${jna.version} + io.github.hakky54 + sslcontext-kickstart-for-netty + ${sslcontext.kickstart.version} - com.thoughtworks.paranamer - paranamer - ${paranamer.version} + org.mindrot + jbcrypt + ${jbcrypt.version} - - com.yscope.clp - clp-ffi - ${clp-ffi.version} + org.scala-lang.modules + scala-xml_${scala.compat.version} + ${scala-xml.version} - - org.codehaus.woodstox - stax2-api - ${stax2-api.version} + io.circe + circe-parser_${scala.compat.version} + ${circe.version} + + + io.circe + circe-generic_${scala.compat.version} + ${circe.version} @@ -1638,74 +1581,60 @@ - org.apache.flink - flink-clients - ${flink.version} - - - org.apache.flink - flink-streaming-java - ${flink.version} - - - org.apache.flink - flink-java - ${flink.version} - - - - com.esotericsoftware.kryo - kryo - - + org.apache.kafka + kafka-clients + ${kafka2.version} - org.jetbrains.kotlin - kotlin-stdlib-jdk8 - ${kotlin.stdlib.version} + org.apache.kafka + kafka_${scala.compat.version} + ${kafka2.version} + - org.jetbrains.kotlin - kotlin-stdlib - ${kotlin.stdlib.version} + io.confluent + kafka-schema-registry-client + ${confluent.version} - org.jetbrains.kotlin - kotlin-stdlib-common - ${kotlin.stdlib.version} + io.confluent + kafka-avro-serializer + ${confluent.version} - org.jetbrains - annotations - ${jetbrains.annotations.version} + io.confluent + kafka-protobuf-serializer + ${confluent.version} + - com.squareup.okio - okio - ${okio.version} + org.apache.pulsar + pulsar-client + ${pulsar.version} - com.squareup.okio - okio-jvm - ${okio.version} + org.apache.pulsar + pulsar-client-admin + ${pulsar.version} + - com.fasterxml.woodstox - woodstox-core - ${woodstox.version} + org.apache.flink + flink-clients + ${flink.version} - io.github.hakky54 - sslcontext-kickstart-for-netty - ${sslcontext.kickstart.version} + org.apache.flink + flink-streaming-java + ${flink.version} - org.mindrot - jbcrypt - ${jbcrypt.version} + org.apache.flink + flink-java + ${flink.version} - + org.apache.lucene lucene-backward-codecs @@ -1726,9 +1655,104 @@ lucene-analysis-common ${lucene.version} - - + + + + org.apache.kerby + kerb-core + ${kerby.version} + + + org.apache.kerby + kerb-simplekdc + ${kerby.version} + + + org.jline + jline + ${jline.version} + + + org.wildfly.common + wildfly-common + ${wildfly.version} + + + org.codehaus.jettison + jettison + ${jettison.version} + + + com.nimbusds + nimbus-jose-jwt + ${nimbus-jose-jwt.version} + + + dnsjava + dnsjava + ${dnsjava.version} + + + + org.eclipse.jetty.websocket + websocket-client + ${eclipse.jetty.version} + + + org.eclipse.jetty + jetty-server + ${eclipse.jetty.version} + + + org.eclipse.jetty + jetty-servlet + ${eclipse.jetty.version} + + + org.eclipse.jetty + jetty-util + ${eclipse.jetty.version} + + + org.eclipse.jetty + jetty-util-ajax + ${eclipse.jetty.version} + + + org.eclipse.jetty + jetty-webapp + ${eclipse.jetty.version} + + + + com.fasterxml.woodstox + woodstox-core + ${woodstox.version} + + + + org.apache.curator + curator-client + ${curator.version} + + + org.apache.curator + curator-framework + ${curator.version} + + + org.apache.curator + curator-recipes + ${curator.version} + + + + org.javassist + javassist + ${javassist.version} + + org.bouncycastle bcpkix-jdk18on @@ -1749,13 +1773,131 @@ bcprov-ext-jdk18on ${bouncycastle.version} - io.airlift aircompressor ${aircompressor.version} + + + net.java.dev.jna + jna + ${jna.version} + + + net.java.dev.jna + jna-platform + ${jna.version} + + + com.github.jnr + jnr-ffi + ${jnr-ffi.version} + + + com.github.jnr + jnr-constants + ${jnr-constants.version} + + + + org.ow2.asm + asm + ${asm.version} + + + + com.thoughtworks.paranamer + paranamer + ${paranamer.version} + + + + org.jetbrains.kotlin + kotlin-stdlib-jdk8 + ${kotlin.stdlib.version} + + + org.jetbrains.kotlin + kotlin-stdlib + ${kotlin.stdlib.version} + + + org.jetbrains.kotlin + kotlin-stdlib-common + ${kotlin.stdlib.version} + + + org.jetbrains.kotlin + kotlin-reflect + ${kotlin.stdlib.version} + + + com.squareup.okio + okio + ${okio.version} + + + com.squareup.okio + okio-jvm + ${okio.version} + + + + org.jetbrains + annotations + ${jetbrains.annotations.version} + + + + com.esotericsoftware.kryo + kryo + ${kryo.version} + + + org.objenesis + objenesis + ${objenesis.version} + + + com.twitter + chill-java + ${chill.version} + + + com.twitter + chill_2.11 + ${chill.version} + + + com.twitter + chill_2.12 + ${chill.version} + + + + com.zaxxer + HikariCP-java7 + ${HikariCP-java7.version} + + + + org.apache.ivy + ivy + ${ivy.version} + + + + com.mchange + c3p0 + ${c3p0.version} + + + com.mchange + mchange-commons-java + ${mchange-commons-java.version} + @@ -1794,12 +1936,30 @@ ${testcontainers.version} test - + + com.h2database + h2 + ${h2.version} + test + + + com.github.jnr + jnr-posix + ${jnr-posix.version} + test + + + org.scalatest + scalatest_${scala.compat.version} + ${scalatest.version} + test + clean install + kr.motd.maven @@ -1807,6 +1967,7 @@ 1.7.1 + @@ -1867,30 +2028,20 @@ - - org.apache.maven.plugins - maven-assembly-plugin - 3.7.1 - - - maven-resources-plugin - 3.3.1 - - - org.apache.maven.plugins - maven-dependency-plugin - 3.7.1 - org.apache.maven.plugins maven-javadoc-plugin - 3.8.0 none ${jdk.version} + + org.jacoco + jacoco-maven-plugin + 0.8.12 + org.apache.maven.plugins maven-source-plugin @@ -1903,19 +2054,9 @@ - - org.apache.maven.plugins - maven-eclipse-plugin - 2.10 - - true - true - - org.apache.maven.plugins maven-jar-plugin - ${maven-jar-plugin.version} @@ -1939,7 +2080,6 @@ org.apache.maven.plugins maven-surefire-plugin - ${surefire.version} 1 false @@ -1974,8 +2114,6 @@ org.apache.maven.plugins maven-enforcer-plugin - - ${maven-enforcer-plugin.version} enforce-dependency-convergence @@ -1998,14 +2136,18 @@ log4j:log4j - org.apache.logging.log4j:log4j-slf4j-impl org.slf4j:slf4j-log4j12 org.slf4j:slf4j-reload4j ch.qos.reload4j:reload4j + ch.qos.logback + + org.codehaus.jackson com.sun.jersey org.apache.hadoop.thirdparty:hadoop-shaded-protobuf_3_7 + + org.bouncycastle:bcprov-jdk15on @@ -2024,10 +2166,19 @@ license-maven-plugin 4.5 + + org.codehaus.mojo + buildnumber-maven-plugin + 3.2.0 + + + org.codehaus.mojo + versions-maven-plugin + 2.17.1 + org.apache.maven.plugins maven-release-plugin - 3.1.1 org.apache.maven.scm @@ -2044,8 +2195,7 @@ protobuf-maven-plugin 0.6.1 - com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} - + com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} grpc-java io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier} @@ -2063,7 +2213,6 @@ org.apache.maven.plugins maven-compiler-plugin - 3.13.0 ${jdk.version} ${jdk.version} @@ -2072,11 +2221,6 @@ ${project.build.sourceEncoding} - - org.apache.maven.plugins - maven-remote-resources-plugin - 3.2.0 - com.googlecode.fmpp-maven-plugin fmpp-maven-plugin @@ -2151,13 +2295,46 @@ + + org.scalatest + scalatest-maven-plugin + 2.2.0 + + ${project.build.directory}/surefire-reports + . + false + + + + test + + test + + + + + + com.github.eirslett + frontend-maven-plugin + 1.15.0 + + + net.nicoulaj.maven.plugins + checksum-maven-plugin + 1.11 + + + org.codehaus.mojo + exec-maven-plugin + 3.4.1 + + org.apache.maven.plugins maven-checkstyle-plugin - 3.4.0 ${pinot.root}/config/checkstyle.xml ${pinot.root}/config/suppressions.xml @@ -2171,7 +2348,7 @@ com.puppycrawl.tools checkstyle - 10.17.0 + 10.18.0 @@ -2248,7 +2425,6 @@ org.jacoco jacoco-maven-plugin - 0.8.12 @@ -2273,11 +2449,6 @@ - - org.codehaus.mojo - sonar-maven-plugin - 2.7.1 - com.diffplug.spotless spotless-maven-plugin @@ -2285,7 +2456,6 @@ com.mycila license-maven-plugin - 4.5 @@ -2405,7 +2575,6 @@ org.apache.rat apache-rat-plugin - 0.16.1 verify @@ -2502,7 +2671,6 @@ org.codehaus.mojo buildnumber-maven-plugin - 3.2.0 validate @@ -2515,11 +2683,10 @@ org.codehaus.mojo versions-maven-plugin - 2.17.1 + org.apache.maven.plugins maven-shade-plugin - ${maven-shade-plugin.version} org.apache.logging.log4j @@ -2601,12 +2768,14 @@ + + org.apache.maven.plugins maven-jxr-plugin - 3.4.0 + 3.5.0 true @@ -2614,7 +2783,6 @@ org.apache.maven.plugins maven-project-info-reports-plugin - 3.6.2 false