Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
3f967ac
First commit for RBAC support
soumitra-st Jun 30, 2023
fd944fb
Adding default authorization method for RBAC to call for APIs without…
soumitra-st Jun 30, 2023
a1708dc
Added RBAC Annotation for Broker and Controller APIs
soumitra-st Jul 8, 2023
a5149d0
Fixed unit test failures
soumitra-st Jul 9, 2023
39ded6c
Using schema name, since table name contains type (OFFLINE) suffix fo…
soumitra-st Jul 10, 2023
5ed6421
Fixed error message and changed /segments auth to cluster level
soumitra-st Jul 10, 2023
8ebb21e
Introduced Authorize annotation for finer grain authorization support…
soumitra-st Jul 14, 2023
fa5fb8c
Adding Authorize annotation definition
soumitra-st Jul 14, 2023
2d55f12
Moved all actions to a class to serve as a single place to document a…
soumitra-st Jul 15, 2023
c043f46
Defined more actions
soumitra-st Jul 16, 2023
4c34524
Fixed long line
soumitra-st Jul 16, 2023
b9c9731
Adding cluster name to access control factory config to be used by Ac…
soumitra-st Jul 17, 2023
25bd346
Refactored and commented code
soumitra-st Jul 18, 2023
f8da594
Fixed Checkstyle violations
soumitra-st Jul 18, 2023
bf66c10
Removed dev comments
soumitra-st Jul 18, 2023
e61d693
Added an interface for fine-grained access control to avoid code dupl…
soumitra-st Jul 24, 2023
0054e5e
Removing authorize annotation in test packages
soumitra-st Jul 24, 2023
d265412
Merge branch 'master' into rbac-impl
soumitra-st Jul 24, 2023
d682a20
Removed unused imports
soumitra-st Jul 24, 2023
fb74eca
Sanitized action names in <verb><noun> format and ordered the actions
soumitra-st Jul 24, 2023
bf0ec21
Fixed check style violations
soumitra-st Jul 24, 2023
33e1363
Action name changes and refactorings
soumitra-st Jul 25, 2023
5386f73
Using table name instead of schema name for auth check
soumitra-st Jul 26, 2023
e629beb
Fixed null check and renamed few actions
soumitra-st Jul 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@

import java.util.Set;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.core.auth.FineGrainedAccessControl;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.annotations.InterfaceStability;


@InterfaceAudience.Public
@InterfaceStability.Stable
public interface AccessControl {
public interface AccessControl extends FineGrainedAccessControl {
/**
* First-step access control when processing broker requests. Decides whether request is allowed to acquire resources
* for further processing. Request may still be rejected at table-level later on.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import javax.ws.rs.core.MediaType;
import org.apache.pinot.broker.broker.BrokerAdminApiApplication;
import org.apache.pinot.common.utils.PinotAppConfigs;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.spi.env.PinotConfiguration;

import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
Expand All @@ -52,6 +55,7 @@ public class PinotBrokerAppConfigs {

@GET
@Path("/appconfigs")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_APP_CONFIG)
@Produces(MediaType.APPLICATION_JSON)
public String getAppConfigs() {
PinotConfiguration pinotConfiguration =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,17 @@
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.ManualAuthorization;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.core.routing.RoutingTable;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
Expand Down Expand Up @@ -69,9 +76,13 @@ public class PinotBrokerDebug {
@Inject
private ServerRoutingStatsManager _serverRoutingStatsManager;

@Inject
AccessControlFactory _accessControlFactory;

@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/debug/timeBoundary/{tableName}")
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.GET_TIME_BOUNDARY)
@ApiOperation(value = "Get the time boundary information for a table")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Time boundary information for a table"),
Expand All @@ -93,6 +104,7 @@ public TimeBoundaryInfo getTimeBoundary(
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/debug/routingTable/{tableName}")
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.GET_ROUTING_TABLE)
@ApiOperation(value = "Get the routing table for a table")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Routing table"),
Expand Down Expand Up @@ -129,16 +141,30 @@ public Map<String, Map<ServerInstance, List<String>>> getRoutingTable(
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/debug/routingTable/sql")
@ManualAuthorization
@ApiOperation(value = "Get the routing table for a query")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Routing table"),
@ApiResponse(code = 404, message = "Routing not found"),
@ApiResponse(code = 500, message = "Internal server error")
})
public Map<ServerInstance, List<String>> getRoutingTableForQuery(
@ApiParam(value = "SQL query (table name should have type suffix)") @QueryParam("query") String query) {
RoutingTable routingTable = _routingManager.getRoutingTable(CalciteSqlCompiler.compileToBrokerRequest(query),
getRequestId());
@ApiParam(value = "SQL query (table name should have type suffix)") @QueryParam("query") String query,
@Context HttpHeaders httpHeaders) {
BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query);

// TODO: Handle nested queries
if (brokerRequest.isSetQuerySource() && brokerRequest.getQuerySource().isSetTableName()) {
if (!_accessControlFactory.create()
.hasAccess(httpHeaders, TargetType.TABLE, brokerRequest.getQuerySource().getTableName(),
Actions.Table.GET_ROUTING_TABLE)) {
throw new WebApplicationException("Permission denied", Response.Status.FORBIDDEN);
}
} else {
throw new WebApplicationException("Table name is not set in the query", Response.Status.BAD_REQUEST);
}

RoutingTable routingTable = _routingManager.getRoutingTable(brokerRequest, getRequestId());
if (routingTable != null) {
return routingTable.getServerInstanceToSegmentsMap();
} else {
Expand All @@ -153,6 +179,7 @@ public Map<ServerInstance, List<String>> getRoutingTableForQuery(
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/debug/serverRoutingStats")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_SERVER_ROUTING_STATS)
@ApiOperation(value = "Get the routing stats for all the servers")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Server routing Stats"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;

import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;

Expand All @@ -58,6 +61,7 @@ public class PinotBrokerHealthCheck {
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("health")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_HEALTH)
@ApiOperation(value = "Checking broker health")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Broker is healthy"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
import org.apache.pinot.common.utils.LoggerUtils;
import org.apache.pinot.common.utils.log.DummyLogFileServer;
import org.apache.pinot.common.utils.log.LogFileServer;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;

import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;

Expand All @@ -61,6 +64,7 @@ public class PinotBrokerLogger {

@GET
@Path("/loggers")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_LOGGER)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Get all the loggers", notes = "Return all the logger names")
public List<String> getLoggers() {
Expand All @@ -69,6 +73,7 @@ public List<String> getLoggers() {

@GET
@Path("/loggers/{loggerName}")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_LOGGER)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Get logger configs", notes = "Return logger info")
public Map<String, String> getLogger(
Expand All @@ -82,6 +87,7 @@ public Map<String, String> getLogger(

@PUT
@Path("/loggers/{loggerName}")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.UPDATE_LOGGER)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Set logger level", notes = "Set logger level for a given logger")
public Map<String, String> setLoggerLevel(@ApiParam(value = "Logger name") @PathParam("loggerName") String loggerName,
Expand All @@ -91,6 +97,7 @@ public Map<String, String> setLoggerLevel(@ApiParam(value = "Logger name") @Path

@GET
@Path("/loggers/files")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_LOG_FILE)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Get all local log files")
public Set<String> getLocalLogFiles() {
Expand All @@ -106,6 +113,7 @@ public Set<String> getLocalLogFiles() {

@GET
@Path("/loggers/download")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_LOG_FILE)
@Produces(MediaType.APPLICATION_OCTET_STREAM)
@ApiOperation(value = "Download a log file")
public Response downloadLogFile(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;

import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;

Expand All @@ -52,6 +55,7 @@ public class PinotBrokerRouting {
@PUT
@Produces(MediaType.TEXT_PLAIN)
@Path("/routing/{tableName}")
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.BUILD_ROUTING)
@ApiOperation(value = "Build/rebuild the routing for a table", notes = "Build/rebuild the routing for a table")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success"),
Expand All @@ -66,6 +70,7 @@ public String buildRouting(
@PUT
@Produces(MediaType.TEXT_PLAIN)
@Path("/routing/refresh/{tableName}/{segmentName}")
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.REFRESH_ROUTING)
@ApiOperation(value = "Refresh the routing for a segment", notes = "Refresh the routing for a segment")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success"),
Expand All @@ -81,6 +86,7 @@ public String refreshRouting(
@DELETE
@Produces(MediaType.TEXT_PLAIN)
@Path("/routing/{tableName}")
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.DELETE_ROUTING)
@ApiOperation(value = "Remove the routing for a table", notes = "Remove the routing for a table")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.ManualAuthorization;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.spi.trace.RequestScope;
import org.apache.pinot.spi.trace.Tracing;
Expand Down Expand Up @@ -109,7 +112,8 @@ public class PinotClientRequest {
public void processSqlQueryGet(@ApiParam(value = "Query", required = true) @QueryParam("sql") String query,
@ApiParam(value = "Trace enabled") @QueryParam(Request.TRACE) String traceEnabled,
@ApiParam(value = "Debug options") @QueryParam(Request.DEBUG_OPTIONS) String debugOptions,
@Suspended AsyncResponse asyncResponse, @Context org.glassfish.grizzly.http.server.Request requestContext) {
@Suspended AsyncResponse asyncResponse, @Context org.glassfish.grizzly.http.server.Request requestContext,
@Context HttpHeaders httpHeaders) {
try {
ObjectNode requestJson = JsonUtils.newObjectNode();
requestJson.put(Request.SQL, query);
Expand All @@ -119,7 +123,7 @@ public void processSqlQueryGet(@ApiParam(value = "Query", required = true) @Quer
if (debugOptions != null) {
requestJson.put(Request.DEBUG_OPTIONS, debugOptions);
}
BrokerResponse brokerResponse = executeSqlQuery(requestJson, makeHttpIdentity(requestContext), true);
BrokerResponse brokerResponse = executeSqlQuery(requestJson, makeHttpIdentity(requestContext), true, httpHeaders);
asyncResponse.resume(brokerResponse.toJsonString());
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
Expand All @@ -141,14 +145,15 @@ public void processSqlQueryGet(@ApiParam(value = "Query", required = true) @Quer
})
@ManualAuthorization
public void processSqlQueryPost(String query, @Suspended AsyncResponse asyncResponse,
@Context org.glassfish.grizzly.http.server.Request requestContext) {
@Context org.glassfish.grizzly.http.server.Request requestContext,
@Context HttpHeaders httpHeaders) {
try {
JsonNode requestJson = JsonUtils.stringToJsonNode(query);
if (!requestJson.has(Request.SQL)) {
throw new IllegalStateException("Payload is missing the query string field 'sql'");
}
BrokerResponse brokerResponse =
executeSqlQuery((ObjectNode) requestJson, makeHttpIdentity(requestContext), false);
executeSqlQuery((ObjectNode) requestJson, makeHttpIdentity(requestContext), false, httpHeaders);
asyncResponse.resume(brokerResponse.toJsonString());
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
Expand All @@ -166,6 +171,7 @@ public void processSqlQueryPost(String query, @Suspended AsyncResponse asyncResp

@DELETE
@Path("query/{queryId}")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Cancel a query as identified by the queryId", notes = "No effect if no query exists for the "
+ "given queryId on the requested broker. Query may continue to run for a short while after calling cancel as "
Expand Down Expand Up @@ -201,6 +207,7 @@ public String cancelQuery(

@GET
@Path("queries")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_RUNNING_QUERY)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Get running queries submitted via the requested broker", notes = "The id is assigned by the "
+ "requested broker and only unique at the scope of this broker")
Expand All @@ -217,7 +224,7 @@ public Map<Long, String> getRunningQueries() {
}

private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterIdentity httpRequesterIdentity,
boolean onlyDql)
boolean onlyDql, HttpHeaders httpHeaders)
throws Exception {
SqlNodeAndOptions sqlNodeAndOptions;
try {
Expand All @@ -234,7 +241,7 @@ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterI
case DQL:
try (RequestScope requestStatistics = Tracing.getTracer().createRequestScope()) {
return _requestHandler.handleRequest(sqlRequestJson, sqlNodeAndOptions, httpRequesterIdentity,
requestStatistics);
requestStatistics, httpHeaders);
}
case DML:
Map<String, String> headers = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import javax.ws.rs.core.UriInfo;
import org.apache.pinot.broker.api.AccessControl;
import org.apache.pinot.broker.api.HttpRequesterIdentity;
import org.apache.pinot.core.auth.FineGrainedAuthUtils;
import org.apache.pinot.core.auth.ManualAuthorization;
import org.glassfish.grizzly.http.server.Request;

Expand Down Expand Up @@ -81,10 +82,13 @@ public void filter(ContainerRequestContext requestContext)

HttpRequesterIdentity httpRequestIdentity = HttpRequesterIdentity.fromRequest(request);

// default authorization handling
if (!accessControl.hasAccess(httpRequestIdentity)) {
throw new WebApplicationException("Failed access check for " + httpRequestIdentity.getEndpointUrl(),
Response.Status.FORBIDDEN);
}

FineGrainedAuthUtils.validateFineGrainedAuth(endpointMethod, uriInfo, _httpHeaders, accessControl);
}

private static boolean isBaseFile(String path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,11 @@ public void start()
_serverRoutingStatsManager.init();
_routingManager = new BrokerRoutingManager(_brokerMetrics, _serverRoutingStatsManager, _brokerConf);
_routingManager.init(_spectatorHelixManager);
final PinotConfiguration factoryConf = _brokerConf.subset(Broker.ACCESS_CONTROL_CONFIG_PREFIX);
// Adding cluster name to the config so that it can be used by the AccessControlFactory
factoryConf.setProperty(Helix.CONFIG_OF_CLUSTER_NAME, _brokerConf.getProperty(Helix.CONFIG_OF_CLUSTER_NAME));
_accessControlFactory =
AccessControlFactory.loadFactory(_brokerConf.subset(Broker.ACCESS_CONTROL_CONFIG_PREFIX), _propertyStore);
AccessControlFactory.loadFactory(factoryConf, _propertyStore);
HelixExternalViewBasedQueryQuotaManager queryQuotaManager =
new HelixExternalViewBasedQueryQuotaManager(_brokerMetrics, _instanceId);
queryQuotaManager.init(_spectatorHelixManager);
Expand Down
Loading