-
Notifications
You must be signed in to change notification settings - Fork 1.5k
[multistage] Table level Access Validation, QPS Quota, Phase Metrics for multistage queries #10534
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3d26983
4954749
6080f46
e5744ec
042e8f7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,11 +21,15 @@ | |
| import com.fasterxml.jackson.databind.JsonNode; | ||
| import java.util.ArrayList; | ||
| import java.util.HashMap; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.concurrent.TimeUnit; | ||
| import javax.annotation.Nullable; | ||
| import org.apache.calcite.jdbc.CalciteSchemaBuilder; | ||
| import org.apache.calcite.plan.RelOptUtil; | ||
| import org.apache.calcite.rel.RelNode; | ||
| import org.apache.commons.lang3.StringUtils; | ||
| import org.apache.pinot.broker.api.RequesterIdentity; | ||
| import org.apache.pinot.broker.broker.AccessControlFactory; | ||
|
|
@@ -35,6 +39,7 @@ | |
| import org.apache.pinot.common.exception.QueryException; | ||
| import org.apache.pinot.common.metrics.BrokerMeter; | ||
| import org.apache.pinot.common.metrics.BrokerMetrics; | ||
| import org.apache.pinot.common.metrics.BrokerQueryPhase; | ||
| import org.apache.pinot.common.request.BrokerRequest; | ||
| import org.apache.pinot.common.response.BrokerResponse; | ||
| import org.apache.pinot.common.response.broker.BrokerResponseNative; | ||
|
|
@@ -145,7 +150,7 @@ private BrokerResponse handleRequest(long requestId, String query, | |
|
|
||
| long compilationStartTimeNs; | ||
| long queryTimeoutMs; | ||
| QueryPlan queryPlan; | ||
| QueryEnvironment.QueryPlannerResult queryPlanResult; | ||
| try { | ||
| // Parse the request | ||
| sqlNodeAndOptions = sqlNodeAndOptions != null ? sqlNodeAndOptions : RequestUtils.parseQuery(query, request); | ||
|
|
@@ -155,11 +160,18 @@ private BrokerResponse handleRequest(long requestId, String query, | |
| compilationStartTimeNs = System.nanoTime(); | ||
| switch (sqlNodeAndOptions.getSqlNode().getKind()) { | ||
| case EXPLAIN: | ||
| String plan = _queryEnvironment.explainQuery(query, sqlNodeAndOptions); | ||
| queryPlanResult = _queryEnvironment.explainQuery(query, sqlNodeAndOptions); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not necessarily for now because we don't send explain plan queries to server.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have the plan to support sending EXPLAIN queries to servers? If yes, probably we can add a TODO here in case if it's forgotten?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there's no reason to send EXPLAIN PLAN to server since broker has all the information the servers have in order to create the PLAN. the only information on server is whether segment A has inverted index; and segment B doesn't have it (e.g. it was generated with a different table config). and
|
||
| String plan = queryPlanResult.getExplainPlan(); | ||
| RelNode explainRelRoot = queryPlanResult.getRelRoot(); | ||
| if (!hasTableAccess(requesterIdentity, getTableNamesFromRelRoot(explainRelRoot), requestId, requestContext)) { | ||
| return new BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR); | ||
| } | ||
|
|
||
| return constructMultistageExplainPlan(query, plan); | ||
| case SELECT: | ||
| default: | ||
| queryPlan = _queryEnvironment.planQuery(query, sqlNodeAndOptions, requestId); | ||
| queryPlanResult = _queryEnvironment.planQuery(query, sqlNodeAndOptions, | ||
| requestId); | ||
| break; | ||
| } | ||
| } catch (Exception e) { | ||
|
|
@@ -169,6 +181,27 @@ private BrokerResponse handleRequest(long requestId, String query, | |
| return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e)); | ||
| } | ||
|
|
||
| QueryPlan queryPlan = queryPlanResult.getQueryPlan(); | ||
| Set<String> tableNames = getTableNamesFromRelRoot(queryPlanResult.getRelRoot()); | ||
vvivekiyer marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| // Compilation Time. This includes the time taken for parsing, compiling, create stage plans and assigning workers. | ||
| long compilationEndTimeNs = System.nanoTime(); | ||
| long compilationTimeNs = (compilationEndTimeNs - compilationStartTimeNs) + sqlNodeAndOptions.getParseTimeNs(); | ||
| updatePhaseTimingForTables(tableNames, BrokerQueryPhase.REQUEST_COMPILATION, compilationTimeNs); | ||
|
|
||
| // Validate table access. | ||
| if (!hasTableAccess(requesterIdentity, tableNames, requestId, requestContext)) { | ||
| return new BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR); | ||
| } | ||
| updatePhaseTimingForTables(tableNames, BrokerQueryPhase.AUTHORIZATION, System.nanoTime() - compilationEndTimeNs); | ||
|
|
||
| // Validate QPS quota | ||
| if (hasExceededQPSQuota(tableNames, requestId, requestContext)) { | ||
| String errorMessage = | ||
| String.format("Request %d: %s exceeds query quota.", requestId, query); | ||
| return new BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR, errorMessage)); | ||
| } | ||
|
|
||
| boolean traceEnabled = Boolean.parseBoolean( | ||
| request.has(CommonConstants.Broker.Request.TRACE) ? request.get(CommonConstants.Broker.Request.TRACE).asText() | ||
| : "false"); | ||
|
|
@@ -179,6 +212,7 @@ private BrokerResponse handleRequest(long requestId, String query, | |
| stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(traceEnabled)); | ||
| } | ||
|
|
||
| long executionStartTimeNs = System.nanoTime(); | ||
| try { | ||
| queryResults = _queryDispatcher.submitAndReduce(requestId, queryPlan, _mailboxService, queryTimeoutMs, | ||
| sqlNodeAndOptions.getOptions(), stageIdStatsMap); | ||
|
|
@@ -189,6 +223,7 @@ private BrokerResponse handleRequest(long requestId, String query, | |
|
|
||
| BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2(); | ||
| long executionEndTimeNs = System.nanoTime(); | ||
| updatePhaseTimingForTables(tableNames, BrokerQueryPhase.QUERY_EXECUTION, executionEndTimeNs - executionStartTimeNs); | ||
|
|
||
| // Set total query processing time | ||
| long totalTimeMs = TimeUnit.NANOSECONDS.toMillis( | ||
|
|
@@ -204,11 +239,10 @@ private BrokerResponse handleRequest(long requestId, String query, | |
| } | ||
|
|
||
| BrokerResponseStats brokerResponseStats = new BrokerResponseStats(); | ||
| List<String> tableNames = queryPlan.getStageMetadataMap().get(entry.getKey()).getScannedTables(); | ||
| if (tableNames.size() > 0) { | ||
| if (!tableNames.isEmpty()) { | ||
| //TODO: Only using first table to assign broker metrics | ||
| // find a way to split metrics in case of multiple table | ||
| String rawTableName = TableNameBuilder.extractRawTableName(tableNames.get(0)); | ||
| String rawTableName = TableNameBuilder.extractRawTableName(tableNames.iterator().next()); | ||
| entry.getValue().setStageLevelStats(rawTableName, brokerResponseStats, _brokerMetrics); | ||
| } else { | ||
| entry.getValue().setStageLevelStats(null, brokerResponseStats, null); | ||
|
|
@@ -221,6 +255,51 @@ private BrokerResponse handleRequest(long requestId, String query, | |
| return brokerResponse; | ||
| } | ||
|
|
||
| /** | ||
| * Validates whether the requester has access to all the tables. | ||
| */ | ||
| private boolean hasTableAccess(RequesterIdentity requesterIdentity, Set<String> tableNames, long requestId, | ||
| RequestContext requestContext) { | ||
| boolean hasAccess = _accessControlFactory.create().hasAccess(requesterIdentity, tableNames); | ||
| if (!hasAccess) { | ||
| _brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1); | ||
| LOGGER.warn("Access denied for requestId {}", requestId); | ||
| requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE); | ||
| return false; | ||
| } | ||
|
|
||
| return true; | ||
| } | ||
|
|
||
| /** | ||
| * Returns true if the QPS quota of the tables has exceeded. | ||
| */ | ||
| private boolean hasExceededQPSQuota(Set<String> tableNames, long requestId, RequestContext requestContext) { | ||
| for (String tableName : tableNames) { | ||
| if (!_queryQuotaManager.acquire(tableName)) { | ||
| LOGGER.warn("Request {}: query exceeds quota for table: {}", requestId, tableName); | ||
| requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE); | ||
| String rawTableName = TableNameBuilder.extractRawTableName(tableName); | ||
| _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_QUOTA_EXCEEDED, 1); | ||
| return true; | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| private Set<String> getTableNamesFromRelRoot(RelNode relRoot) { | ||
| return new HashSet<>(RelOptUtil.findAllTableQualifiedNames(relRoot)); | ||
| } | ||
|
|
||
| private void updatePhaseTimingForTables(Set<String> tableNames, | ||
| BrokerQueryPhase phase, long time) { | ||
| for (String tableName : tableNames) { | ||
| String rawTableName = TableNameBuilder.extractRawTableName(tableName); | ||
| _brokerMetrics.addPhaseTiming(rawTableName, phase, time); | ||
| } | ||
| } | ||
|
|
||
|
|
||
| private BrokerResponseNative constructMultistageExplainPlan(String sql, String plan) { | ||
| BrokerResponseNative brokerResponse = BrokerResponseNative.empty(); | ||
| List<Object[]> rows = new ArrayList<>(); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.