Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.broker.api;

import java.util.Set;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.annotations.InterfaceStability;
Expand Down Expand Up @@ -47,4 +48,14 @@ default boolean hasAccess(RequesterIdentity requesterIdentity) {
* @return {@code true} if authorized, {@code false} otherwise
*/
boolean hasAccess(RequesterIdentity requesterIdentity, BrokerRequest brokerRequest);

/**
* Fine-grained access control on pinot tables.
*
* @param requesterIdentity requester identity
* @param tables Set of pinot tables used in the query. Table name can be with or without tableType.
*
* @return {@code true} if authorized, {@code false} otherwise
*/
boolean hasAccess(RequesterIdentity requesterIdentity, Set<String> tables);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.broker.broker;

import java.util.Set;
import org.apache.pinot.broker.api.AccessControl;
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.common.request.BrokerRequest;
Expand All @@ -43,5 +44,10 @@ private static class AllowAllAccessControl implements AccessControl {
public boolean hasAccess(RequesterIdentity requesterIdentity, BrokerRequest brokerRequest) {
return true;
}

@Override
public boolean hasAccess(RequesterIdentity requesterIdentity, Set<String> tables) {
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.pinot.broker.api.AccessControl;
import org.apache.pinot.broker.api.HttpRequesterIdentity;
Expand Down Expand Up @@ -77,18 +78,12 @@ public BasicAuthAccessControl(Collection<BasicAuthPrincipal> principals) {

@Override
public boolean hasAccess(RequesterIdentity requesterIdentity) {
return hasAccess(requesterIdentity, null);
return hasAccess(requesterIdentity, (BrokerRequest) null);
}

@Override
public boolean hasAccess(RequesterIdentity requesterIdentity, BrokerRequest brokerRequest) {
Preconditions.checkArgument(requesterIdentity instanceof HttpRequesterIdentity, "HttpRequesterIdentity required");
HttpRequesterIdentity identity = (HttpRequesterIdentity) requesterIdentity;

Collection<String> tokens = identity.getHttpHeaders().get(HEADER_AUTHORIZATION);
Optional<BasicAuthPrincipal> principalOpt =
tokens.stream().map(BasicAuthUtils::normalizeBase64Token).map(_token2principal::get).filter(Objects::nonNull)
.findFirst();
Optional<BasicAuthPrincipal> principalOpt = getPrincipalOpt(requesterIdentity);

if (!principalOpt.isPresent()) {
// no matching token? reject
Expand All @@ -104,5 +99,39 @@ public boolean hasAccess(RequesterIdentity requesterIdentity, BrokerRequest brok

return principal.hasTable(brokerRequest.getQuerySource().getTableName());
}

@Override
public boolean hasAccess(RequesterIdentity requesterIdentity, Set<String> tables) {
Optional<BasicAuthPrincipal> principalOpt = getPrincipalOpt(requesterIdentity);

if (!principalOpt.isPresent()) {
// no matching token? reject
return false;
}

if (tables == null || tables.isEmpty()) {
return true;
}

BasicAuthPrincipal principal = principalOpt.get();
for (String table : tables) {
if (!principal.hasTable(table)) {
return false;
}
}

return true;
}

private Optional<BasicAuthPrincipal> getPrincipalOpt(RequesterIdentity requesterIdentity) {
Preconditions.checkArgument(requesterIdentity instanceof HttpRequesterIdentity, "HttpRequesterIdentity required");
HttpRequesterIdentity identity = (HttpRequesterIdentity) requesterIdentity;

Collection<String> tokens = identity.getHttpHeaders().get(HEADER_AUTHORIZATION);
Optional<BasicAuthPrincipal> principalOpt =
tokens.stream().map(BasicAuthUtils::normalizeBase64Token).map(_token2principal::get).filter(Objects::nonNull)
.findFirst();
return principalOpt;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import com.google.common.base.Preconditions;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
Expand Down Expand Up @@ -79,11 +81,42 @@ public BasicAuthAccessControl(AccessControlUserCache userCache) {

@Override
public boolean hasAccess(RequesterIdentity requesterIdentity) {
return hasAccess(requesterIdentity, null);
return hasAccess(requesterIdentity, (BrokerRequest) null);
}

@Override
public boolean hasAccess(RequesterIdentity requesterIdentity, BrokerRequest brokerRequest) {
if (brokerRequest == null || !brokerRequest.isSetQuerySource() || !brokerRequest.getQuerySource()
.isSetTableName()) {
// no table restrictions? accept
return true;
}

return hasAccess(requesterIdentity, Collections.singleton(brokerRequest.getQuerySource().getTableName()));
}

@Override
public boolean hasAccess(RequesterIdentity requesterIdentity, Set<String> tables) {
Optional<ZkBasicAuthPrincipal> principalOpt = getPrincipalAuth(requesterIdentity);
if (!principalOpt.isPresent()) {
// no matching token? reject
return false;
}
if (tables == null || tables.isEmpty()) {
return true;
}

ZkBasicAuthPrincipal principal = principalOpt.get();
for (String table : tables) {
if (!principal.hasTable(table)) {
return false;
}
}

return true;
}

private Optional<ZkBasicAuthPrincipal> getPrincipalAuth(RequesterIdentity requesterIdentity) {
Preconditions.checkArgument(requesterIdentity instanceof HttpRequesterIdentity,
"HttpRequesterIdentity required");
HttpRequesterIdentity identity = (HttpRequesterIdentity) requesterIdentity;
Expand All @@ -95,28 +128,15 @@ public boolean hasAccess(RequesterIdentity requesterIdentity, BrokerRequest brok


Map<String, String> name2password = tokens.stream().collect(Collectors
.toMap(BasicAuthUtils::extractUsername, BasicAuthUtils::extractPassword));
.toMap(BasicAuthUtils::extractUsername, BasicAuthUtils::extractPassword));
Map<String, ZkBasicAuthPrincipal> password2principal = name2password.keySet().stream()
.collect(Collectors.toMap(name2password::get, _name2principal::get));
.collect(Collectors.toMap(name2password::get, _name2principal::get));

Optional<ZkBasicAuthPrincipal> principalOpt =
password2principal.entrySet().stream()
.filter(entry -> BcryptUtils.checkpw(entry.getKey(), entry.getValue().getPassword()))
.map(u -> u.getValue()).filter(Objects::nonNull).findFirst();

if (!principalOpt.isPresent()) {
// no matching token? reject
return false;
}

ZkBasicAuthPrincipal principal = principalOpt.get();
if (brokerRequest == null || !brokerRequest.isSetQuerySource() || !brokerRequest.getQuerySource()
.isSetTableName()) {
// no table restrictions? accept
return true;
}

return principal.hasTable(brokerRequest.getQuerySource().getTableName());
password2principal.entrySet().stream()
.filter(entry -> BcryptUtils.checkpw(entry.getKey(), entry.getValue().getPassword()))
.map(u -> u.getValue()).filter(Objects::nonNull).findFirst();
return principalOpt;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@
import com.fasterxml.jackson.databind.JsonNode;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.calcite.jdbc.CalciteSchemaBuilder;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.broker.broker.AccessControlFactory;
Expand All @@ -35,6 +39,7 @@
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.BrokerQueryPhase;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
Expand Down Expand Up @@ -145,7 +150,7 @@ private BrokerResponse handleRequest(long requestId, String query,

long compilationStartTimeNs;
long queryTimeoutMs;
QueryPlan queryPlan;
QueryEnvironment.QueryPlannerResult queryPlanResult;
try {
// Parse the request
sqlNodeAndOptions = sqlNodeAndOptions != null ? sqlNodeAndOptions : RequestUtils.parseQuery(query, request);
Expand All @@ -155,11 +160,18 @@ private BrokerResponse handleRequest(long requestId, String query,
compilationStartTimeNs = System.nanoTime();
switch (sqlNodeAndOptions.getSqlNode().getKind()) {
case EXPLAIN:
String plan = _queryEnvironment.explainQuery(query, sqlNodeAndOptions);
queryPlanResult = _queryEnvironment.explainQuery(query, sqlNodeAndOptions);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For EXPLAIN cases, shouldn't we consider deducting the qps quota for that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily for now because we don't send explain plan queries to server.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have the plan to support sending EXPLAIN queries to servers? If yes, probably we can add a TODO here in case if it's forgotten?

Copy link
Contributor

@walterddr walterddr Apr 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's no reason to send EXPLAIN PLAN to server since broker has all the information the servers have in order to create the PLAN. the only information on server is whether segment A has inverted index; and segment B doesn't have it (e.g. it was generated with a different table config). and

  1. sending per-segment level physical plan info back to user is neither necessary nor practical
  2. we can always view segment level info via other APIs

String plan = queryPlanResult.getExplainPlan();
RelNode explainRelRoot = queryPlanResult.getRelRoot();
if (!hasTableAccess(requesterIdentity, getTableNamesFromRelRoot(explainRelRoot), requestId, requestContext)) {
return new BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR);
}

return constructMultistageExplainPlan(query, plan);
case SELECT:
default:
queryPlan = _queryEnvironment.planQuery(query, sqlNodeAndOptions, requestId);
queryPlanResult = _queryEnvironment.planQuery(query, sqlNodeAndOptions,
requestId);
break;
}
} catch (Exception e) {
Expand All @@ -169,6 +181,27 @@ private BrokerResponse handleRequest(long requestId, String query,
return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e));
}

QueryPlan queryPlan = queryPlanResult.getQueryPlan();
Set<String> tableNames = getTableNamesFromRelRoot(queryPlanResult.getRelRoot());

// Compilation Time. This includes the time taken for parsing, compiling, create stage plans and assigning workers.
long compilationEndTimeNs = System.nanoTime();
long compilationTimeNs = (compilationEndTimeNs - compilationStartTimeNs) + sqlNodeAndOptions.getParseTimeNs();
updatePhaseTimingForTables(tableNames, BrokerQueryPhase.REQUEST_COMPILATION, compilationTimeNs);

// Validate table access.
if (!hasTableAccess(requesterIdentity, tableNames, requestId, requestContext)) {
return new BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR);
}
updatePhaseTimingForTables(tableNames, BrokerQueryPhase.AUTHORIZATION, System.nanoTime() - compilationEndTimeNs);

// Validate QPS quota
if (hasExceededQPSQuota(tableNames, requestId, requestContext)) {
String errorMessage =
String.format("Request %d: %s exceeds query quota.", requestId, query);
return new BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR, errorMessage));
}

boolean traceEnabled = Boolean.parseBoolean(
request.has(CommonConstants.Broker.Request.TRACE) ? request.get(CommonConstants.Broker.Request.TRACE).asText()
: "false");
Expand All @@ -179,6 +212,7 @@ private BrokerResponse handleRequest(long requestId, String query,
stageIdStatsMap.put(stageId, new ExecutionStatsAggregator(traceEnabled));
}

long executionStartTimeNs = System.nanoTime();
try {
queryResults = _queryDispatcher.submitAndReduce(requestId, queryPlan, _mailboxService, queryTimeoutMs,
sqlNodeAndOptions.getOptions(), stageIdStatsMap);
Expand All @@ -189,6 +223,7 @@ private BrokerResponse handleRequest(long requestId, String query,

BrokerResponseNativeV2 brokerResponse = new BrokerResponseNativeV2();
long executionEndTimeNs = System.nanoTime();
updatePhaseTimingForTables(tableNames, BrokerQueryPhase.QUERY_EXECUTION, executionEndTimeNs - executionStartTimeNs);

// Set total query processing time
long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(
Expand All @@ -204,11 +239,10 @@ private BrokerResponse handleRequest(long requestId, String query,
}

BrokerResponseStats brokerResponseStats = new BrokerResponseStats();
List<String> tableNames = queryPlan.getStageMetadataMap().get(entry.getKey()).getScannedTables();
if (tableNames.size() > 0) {
if (!tableNames.isEmpty()) {
//TODO: Only using first table to assign broker metrics
// find a way to split metrics in case of multiple table
String rawTableName = TableNameBuilder.extractRawTableName(tableNames.get(0));
String rawTableName = TableNameBuilder.extractRawTableName(tableNames.iterator().next());
entry.getValue().setStageLevelStats(rawTableName, brokerResponseStats, _brokerMetrics);
} else {
entry.getValue().setStageLevelStats(null, brokerResponseStats, null);
Expand All @@ -221,6 +255,51 @@ private BrokerResponse handleRequest(long requestId, String query,
return brokerResponse;
}

/**
* Validates whether the requester has access to all the tables.
*/
private boolean hasTableAccess(RequesterIdentity requesterIdentity, Set<String> tableNames, long requestId,
RequestContext requestContext) {
boolean hasAccess = _accessControlFactory.create().hasAccess(requesterIdentity, tableNames);
if (!hasAccess) {
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1);
LOGGER.warn("Access denied for requestId {}", requestId);
requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
return false;
}

return true;
}

/**
* Returns true if the QPS quota of the tables has exceeded.
*/
private boolean hasExceededQPSQuota(Set<String> tableNames, long requestId, RequestContext requestContext) {
for (String tableName : tableNames) {
if (!_queryQuotaManager.acquire(tableName)) {
LOGGER.warn("Request {}: query exceeds quota for table: {}", requestId, tableName);
requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE);
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_QUOTA_EXCEEDED, 1);
return true;
}
}
return false;
}

private Set<String> getTableNamesFromRelRoot(RelNode relRoot) {
return new HashSet<>(RelOptUtil.findAllTableQualifiedNames(relRoot));
}

private void updatePhaseTimingForTables(Set<String> tableNames,
BrokerQueryPhase phase, long time) {
for (String tableName : tableNames) {
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
_brokerMetrics.addPhaseTiming(rawTableName, phase, time);
}
}


private BrokerResponseNative constructMultistageExplainPlan(String sql, String plan) {
BrokerResponseNative brokerResponse = BrokerResponseNative.empty();
List<Object[]> rows = new ArrayList<>();
Expand Down
Loading