Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
Expand All @@ -35,7 +37,6 @@
import org.apache.pinot.query.runtime.QueryRunner;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils;
import org.apache.pinot.query.service.SubmissionService;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
Expand Down Expand Up @@ -112,21 +113,31 @@ public void submit(Worker.QueryRequest request, StreamObserver<Worker.QueryRespo
responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("Bad request").withCause(e).asException());
return;
}
// 2. Submit distributed stage plans
SubmissionService submissionService = new SubmissionService(_querySubmissionExecutorService);
distributedStagePlans.forEach(distributedStagePlan -> submissionService.submit(() -> {
_queryRunner.processQuery(distributedStagePlan, requestMetadata);
}));
// 3. await response successful or any failure which cancels all other tasks.
// 2. Submit distributed stage plans, await response successful or any failure which cancels all other tasks.
int numSubmission = distributedStagePlans.size();
CompletableFuture<?>[] submissionStubs = new CompletableFuture[numSubmission];
for (int i = 0; i < numSubmission; i++) {
DistributedStagePlan distributedStagePlan = distributedStagePlans.get(i);
submissionStubs[i] =
CompletableFuture.runAsync(() -> _queryRunner.processQuery(distributedStagePlan, requestMetadata),
_querySubmissionExecutorService);
}
try {
submissionService.awaitFinish(deadlineMs);
} catch (Throwable t) {
LOGGER.error("error occurred during stage submission for {}:\n{}", requestId, t);
CompletableFuture.allOf(submissionStubs).get(deadlineMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
LOGGER.error("error occurred during stage submission for {}:\n{}", requestId, e);
responseObserver.onNext(Worker.QueryResponse.newBuilder()
.putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR,
QueryException.getTruncatedStackTrace(t)).build());
QueryException.getTruncatedStackTrace(e)).build());
responseObserver.onCompleted();
return;
} finally {
// Cancel all ongoing submission
for (CompletableFuture<?> future : submissionStubs) {
if (!future.isDone()) {
future.cancel(true);
}
}
}
responseObserver.onNext(
Worker.QueryResponse.newBuilder().putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_OK, "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
Expand Down Expand Up @@ -108,16 +109,9 @@ public void shutDown() {
_queryRunner.shutDown();
}

public void processQuery(DistributedStagePlan distributedStagePlan, Map<String, String> requestMetadataMap) {
_queryRunner.getExecutorService().submit(() -> {
try {
_queryRunner.processQuery(distributedStagePlan, requestMetadataMap);
} catch (Exception e) {
// TODO: Find a way to propagate the exception and fail the test
System.err.println("Caught exception while executing query");
e.printStackTrace(System.err);
throw new RuntimeException("Error executing query!", e);
}
});
public CompletableFuture<Void> processQuery(DistributedStagePlan distributedStagePlan,
Map<String, String> requestMetadataMap) {
return CompletableFuture.runAsync(() -> _queryRunner.processQuery(distributedStagePlan, requestMetadataMap),
_queryRunner.getExecutorService());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,16 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import java.util.stream.Collectors;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.QueryEnvironmentTestBase;
import org.apache.pinot.query.QueryServerEnclosure;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.DispatchablePlanFragment;
import org.apache.pinot.query.planner.DispatchableSubPlan;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory;
import org.apache.pinot.query.testutils.QueryTestUtils;
import org.apache.pinot.spi.config.table.TableType;
Expand All @@ -44,8 +39,6 @@
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -181,34 +174,22 @@ public void testSqlWithH2Checker(String sql)
*/
@Test(dataProvider = "testDataWithSqlExecutionExceptions")
public void testSqlWithExceptionMsgChecker(String sql, String exceptionMsg) {
long requestId = REQUEST_ID_GEN.getAndIncrement();
SqlNodeAndOptions sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(sql);
QueryEnvironment.QueryPlannerResult queryPlannerResult =
_queryEnvironment.planQuery(sql, sqlNodeAndOptions, requestId);
DispatchableSubPlan dispatchableSubPlan = queryPlannerResult.getQueryPlan();
Map<String, String> requestMetadataMap = new HashMap<>();
requestMetadataMap.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, String.valueOf(requestId));
Long timeoutMsInQueryOption = QueryOptionsUtils.getTimeoutMs(sqlNodeAndOptions.getOptions());
long timeoutMs =
timeoutMsInQueryOption != null ? timeoutMsInQueryOption : CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS;
requestMetadataMap.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS, String.valueOf(timeoutMs));
requestMetadataMap.put(CommonConstants.Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING, "true");
requestMetadataMap.putAll(sqlNodeAndOptions.getOptions());
List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList();
for (int stageId = 1; stageId < stagePlans.size(); stageId++) {
processDistributedStagePlans(dispatchableSubPlan, stageId, requestMetadataMap);
}
try {
QueryDispatcher.runReducer(requestId, dispatchableSubPlan, timeoutMs, Collections.emptyMap(), null,
_mailboxService);
Assert.fail("Should have thrown exception!");
} catch (RuntimeException e) {
// query pinot
List<Object[]> resultRows = queryRunner(sql, null);
Assert.fail(
"Expected error with message '" + exceptionMsg + "'. But instead rows were returned: " + resultRows.stream()
.map(Arrays::toString).collect(Collectors.joining(",\n")));
} catch (Exception e) {
// NOTE: The actual message is (usually) something like:
// Received error query execution result block: {200=QueryExecutionError:
// Query execution error on: Server_localhost_12345
// java.lang.IllegalArgumentException: Illegal Json Path: $['path'] does not match document
String exceptionMessage = e.getMessage();
Assert.assertTrue(exceptionMessage.startsWith("Received error query execution result block: "));
Assert.assertTrue(
exceptionMessage.startsWith("Received error query execution result block: ") || exceptionMessage.startsWith(
"Error occurred during stage submission"),
"Exception message didn't start with proper heading: " + exceptionMessage);
Assert.assertTrue(exceptionMessage.contains(exceptionMsg),
"Exception should contain: " + exceptionMsg + ", but found: " + exceptionMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.query.reduce.ExecutionStatsAggregator;
Expand Down Expand Up @@ -112,33 +115,51 @@ protected List<Object[]> queryRunner(String sql, Map<Integer, ExecutionStatsAggr
requestMetadataMap.put(CommonConstants.Broker.Request.TRACE, "true");
}

// Submission Stub logic are mimic {@link QueryServer}
List<DispatchablePlanFragment> stagePlans = dispatchableSubPlan.getQueryStageList();
List<CompletableFuture<?>> submissionStubs = new ArrayList<>();
for (int stageId = 0; stageId < stagePlans.size(); stageId++) {
if (stageId != 0) {
processDistributedStagePlans(dispatchableSubPlan, stageId, requestMetadataMap);
submissionStubs.addAll(processDistributedStagePlans(dispatchableSubPlan, stageId, requestMetadataMap));
}
if (executionStatsAggregatorMap != null) {
executionStatsAggregatorMap.put(stageId, new ExecutionStatsAggregator(true));
}
}
try {
CompletableFuture.allOf(submissionStubs.toArray(new CompletableFuture[0])).get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (Exception e) {
// wrap and throw the exception here is for assert purpose on dispatch-time error
throw new RuntimeException("Error occurred during stage submission: " + QueryException.getTruncatedStackTrace(e));
} finally {
// Cancel all ongoing submission
for (CompletableFuture<?> future : submissionStubs) {
if (!future.isDone()) {
future.cancel(true);
}
}
}
// exception will be propagated through for assert purpose on runtime error
ResultTable resultTable =
QueryDispatcher.runReducer(requestId, dispatchableSubPlan, timeoutMs, Collections.emptyMap(),
executionStatsAggregatorMap, _mailboxService);
return resultTable.getRows();
}

protected void processDistributedStagePlans(DispatchableSubPlan dispatchableSubPlan, int stageId,
Map<String, String> requestMetadataMap) {
protected List<CompletableFuture<?>> processDistributedStagePlans(DispatchableSubPlan dispatchableSubPlan,
int stageId, Map<String, String> requestMetadataMap) {
Map<QueryServerInstance, List<Integer>> serverInstanceToWorkerIdMap =
dispatchableSubPlan.getQueryStageList().get(stageId).getServerInstanceToWorkerIdMap();
List<CompletableFuture<?>> submissionStubs = new ArrayList<>();
for (Map.Entry<QueryServerInstance, List<Integer>> entry : serverInstanceToWorkerIdMap.entrySet()) {
QueryServerInstance server = entry.getKey();
for (int workerId : entry.getValue()) {
DistributedStagePlan distributedStagePlan =
constructDistributedStagePlan(dispatchableSubPlan, stageId, new VirtualServerAddress(server, workerId));
_servers.get(server).processQuery(distributedStagePlan, requestMetadataMap);
submissionStubs.add(_servers.get(server).processQuery(distributedStagePlan, requestMetadataMap));
}
}
return submissionStubs;
}

protected static DistributedStagePlan constructDistributedStagePlan(DispatchableSubPlan dispatchableSubPlan,
Expand Down