Skip to content

Commit

Permalink
HIVE-13125: Support masking and filtering of rows/columns (Pengcheng …
Browse files Browse the repository at this point in the history
…Xiong, reviewed by Ashutosh Chauhan)
  • Loading branch information
pengchengxiong committed Mar 20, 2016
1 parent c0c08a3 commit a0a5371
Show file tree
Hide file tree
Showing 27 changed files with 18,475 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAccessControlException;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext;
Expand Down Expand Up @@ -104,4 +105,32 @@ public void checkPrivileges(HiveOperationType hiveOpType, List<HivePrivilegeObje

}

public String getRowFilterExpression(String database, String table) throws SemanticException {
if (table.equals("masking_test")) {
return "key % 2 = 0 and key < 10";
} else if (table.equals("masking_test_subq")) {
return "key in (select key from src where src.key = masking_test_subq.key)";
}
return null;
}

public boolean needTransform() {
// In the future, we can add checking for username, groupname, etc based on
// HiveAuthenticationProvider. For example,
// "hive_test_user".equals(context.getUserName());
return true;
}

public boolean needTransform(String database, String table) {
return "masking_test".equals(table) || "masking_test_subq".equals(table);
}

public String getCellValueTransformer(String database, String table, String columnName)
throws SemanticException {
if (table.equals("masking_test") && columnName.equals("value")) {
return "reverse(value)";
}
return columnName;
}

}
161 changes: 161 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
Expand Down Expand Up @@ -67,6 +69,7 @@
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryProperties;
import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
Expand Down Expand Up @@ -311,13 +314,23 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
*/
boolean rootTasksResolved;

private final TableMask tableMask;

CreateTableDesc tableDesc;

/** Not thread-safe. */
final ASTSearcher astSearcher = new ASTSearcher();

protected AnalyzeRewriteContext analyzeRewrite;

// A mapping from a tableName to a table object in metastore.
Map<String, Table> tableNameToMetaDataTableObject;

// The tokens we should ignore when we are trying to do table masking.
private final Set<Integer> ignoredTokens = Sets.newHashSet(HiveParser.TOK_GROUPBY,
HiveParser.TOK_ORDERBY, HiveParser.TOK_WINDOWSPEC, HiveParser.TOK_CLUSTERBY,
HiveParser.TOK_DISTRIBUTEBY, HiveParser.TOK_SORTBY);

static class Phase1Ctx {
String dest;
int nextNum;
Expand Down Expand Up @@ -357,6 +370,8 @@ public SemanticAnalyzer(HiveConf conf) throws SemanticException {
globalLimitCtx = new GlobalLimitCtx();
viewAliasToInput = new HashMap<String, ReadEntity>();
noscan = partialscan = false;
tableMask = new TableMask(this, conf);
tableNameToMetaDataTableObject = new HashMap<>();
}

@Override
Expand Down Expand Up @@ -10307,6 +10322,145 @@ void setInsertToken(ASTNode ast, boolean isTmpFileDest) {
}
}

private Table getMetaDataTableObjectByName(String tableName) throws HiveException {
if (!tableNameToMetaDataTableObject.containsKey(tableName)) {
Table table = db.getTable(tableName);
tableNameToMetaDataTableObject.put(tableName, table);
return table;
} else {
return tableNameToMetaDataTableObject.get(tableName);
}
}

private void walkASTMarkTABREF(ASTNode ast, Set<String> cteAlias)
throws SemanticException {
Queue<Node> queue = new LinkedList<>();
queue.add(ast);
while (!queue.isEmpty()) {
ASTNode astNode = (ASTNode) queue.poll();
if (astNode.getToken().getType() == HiveParser.TOK_TABREF) {
int aliasIndex = 0;
StringBuffer additionalTabInfo = new StringBuffer();
for (int index = 1; index < astNode.getChildCount(); index++) {
ASTNode ct = (ASTNode) astNode.getChild(index);
// TODO: support TOK_TABLEBUCKETSAMPLE, TOK_TABLESPLITSAMPLE, and
// TOK_TABLEPROPERTIES
if (ct.getToken().getType() == HiveParser.TOK_TABLEBUCKETSAMPLE
|| ct.getToken().getType() == HiveParser.TOK_TABLESPLITSAMPLE
|| ct.getToken().getType() == HiveParser.TOK_TABLEPROPERTIES) {
additionalTabInfo.append(ctx.getTokenRewriteStream().toString(ct.getTokenStartIndex(),
ct.getTokenStopIndex()));
} else {
aliasIndex = index;
}
}

ASTNode tableTree = (ASTNode) (astNode.getChild(0));

String tabIdName = getUnescapedName(tableTree);

String alias;
if (aliasIndex != 0) {
alias = unescapeIdentifier(astNode.getChild(aliasIndex).getText());
} else {
alias = getUnescapedUnqualifiedTableName(tableTree);
}

// We need to know if it is CTE or not.
// A CTE may have the same name as a table.
// For example,
// with select TAB1 [masking] as TAB2
// select * from TAB2 [no masking]
if (cteAlias.contains(tabIdName)) {
continue;
}

String replacementText = null;
Table table = null;
try {
table = getMetaDataTableObjectByName(tabIdName);
} catch (HiveException e) {
throw new SemanticException("Table " + tabIdName + " is not found.");
}

if (tableMask.needTransform(table.getDbName(), table.getTableName())) {
replacementText = tableMask.create(table, additionalTabInfo.toString(), alias);
}
if (replacementText != null) {
tableMask.setNeedsRewrite(true);
// we replace the tabref with replacementText here.
tableMask.addTableMasking(astNode, replacementText);
}
}
if (astNode.getChildCount() > 0 && !ignoredTokens.contains(astNode.getToken().getType())) {
for (Node child : astNode.getChildren()) {
queue.offer(child);
}
}
}
}

// We walk through the AST.
// We replace all the TOK_TABREF by adding additional masking and filter if
// the table needs to be masked or filtered.
// For the replacement, we leverage the methods that are used for
// unparseTranslator.
public ASTNode rewriteASTWithMaskAndFilter(ASTNode ast) throws SemanticException {
// 1. collect information about CTE if there is any.
// The base table of CTE should be masked.
// The CTE itself should not be masked in the references in the following main query.
Set<String> cteAlias = new HashSet<>();
if (ast.getChildCount() > 0
&& HiveParser.TOK_CTE == ((ASTNode) ast.getChild(0)).getToken().getType()) {
// the structure inside CTE is like this
// TOK_CTE
// TOK_SUBQUERY
// sq1 (may refer to sq2)
// ...
// TOK_SUBQUERY
// sq2
ASTNode cte = (ASTNode) ast.getChild(0);
// we start from sq2, end up with sq1.
for (int index = cte.getChildCount() - 1; index >= 0; index--) {
ASTNode subq = (ASTNode) cte.getChild(index);
String alias = unescapeIdentifier(subq.getChild(1).getText());
if (cteAlias.contains(alias)) {
throw new SemanticException("Duplicate definition of " + alias);
} else {
cteAlias.add(alias);
walkASTMarkTABREF(subq, cteAlias);
}
}
// walk the other part of ast
for (int index = 1; index < ast.getChildCount(); index++) {
walkASTMarkTABREF((ASTNode) ast.getChild(index), cteAlias);
}
}
// there is no CTE, walk the whole AST
else {
walkASTMarkTABREF(ast, cteAlias);
}
// 2. rewrite the AST, replace TABREF with masking/filtering
if (tableMask.needsRewrite()) {
tableMask.applyTableMasking(ctx.getTokenRewriteStream());
String rewrittenQuery = ctx.getTokenRewriteStream().toString(ast.getTokenStartIndex(),
ast.getTokenStopIndex());
ASTNode rewrittenTree;
// Parse the rewritten query string
// check if we need to ctx.setCmd(rewrittenQuery);
ParseDriver pd = new ParseDriver();
try {
rewrittenTree = pd.parse(rewrittenQuery);
} catch (ParseException e) {
throw new SemanticException(e);
}
rewrittenTree = ParseUtils.findRootNonNullToken(rewrittenTree);
return rewrittenTree;
} else {
return ast;
}
}

boolean genResolvedParseTree(ASTNode ast, PlannerContext plannerCtx) throws SemanticException {
ASTNode child = ast;
this.ast = ast;
Expand Down Expand Up @@ -10362,6 +10516,13 @@ else if(ast.getChild(0).getType() == HiveParser.TOK_FALSE) {
SessionState.get().setCommandType(SemanticAnalyzerFactory.getOperation(ast.getToken().getType()));
return false;
}

// masking and filtering should be done here
// the basic idea is similar to unparseTranslator.
if (!unparseTranslator.isEnabled() && tableMask.isEnabled()) {
child = rewriteASTWithMaskAndFilter(ast);
}

// 4. continue analyzing from the child ASTNode.
Phase1Ctx ctx_1 = initPhase1Ctx();
preProcessForInsert(child, qb);
Expand Down
127 changes: 127 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/TableMask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.parse;

import java.util.List;

import org.antlr.runtime.TokenRewriteStream;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizer;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The main purpose for this class is for authorization. More specifically, row
* filtering and column masking are done through this class. We first call
* create function to create the corresponding strings for row filtering and
* column masking. We then replace the TAB_REF with the strings.
*/
public class TableMask {

protected final Logger LOG = LoggerFactory.getLogger(TableMask.class);
HiveAuthorizer authorizer;
private UnparseTranslator translator;
private boolean enable;
private boolean needsRewrite;

public TableMask(SemanticAnalyzer analyzer, HiveConf conf) throws SemanticException {
try {
authorizer = SessionState.get().getAuthorizerV2();
if (authorizer != null && needTransform()) {
enable = true;
translator = new UnparseTranslator(conf);
translator.enable();
}
} catch (Exception e) {
LOG.warn("Failed to initialize masking policy");
throw new SemanticException(e);
}
}

private String createRowMask(String db, String name) throws SemanticException {
return authorizer.getRowFilterExpression(db, name);
}

private String createExpressions(String db, String tbl, String colName) throws SemanticException {
return authorizer.getCellValueTransformer(db, tbl, colName);
}

public boolean isEnabled() throws SemanticException {
return enable;
}

public boolean needTransform() throws SemanticException {
return authorizer.needTransform();
}

public boolean needTransform(String database, String table) throws SemanticException {
return authorizer.needTransform(database, table);
}

public String create(Table table, String additionalTabInfo, String alias) throws SemanticException {
String db = table.getDbName();
String tbl = table.getTableName();
StringBuilder sb = new StringBuilder();
sb.append("(SELECT ");
List<FieldSchema> cols = table.getAllCols();
boolean firstOne = true;
for (FieldSchema fs : cols) {
if (!firstOne) {
sb.append(", ");
} else {
firstOne = false;
}
String colName = fs.getName();
String expr = createExpressions(db, tbl, colName);
if (expr == null) {
sb.append(colName);
} else {
sb.append(expr + " AS " + colName);
}
}
sb.append(" FROM " + tbl);
sb.append(" " + additionalTabInfo);
String filter = createRowMask(db, tbl);
if (filter != null) {
sb.append(" WHERE " + filter);
}
sb.append(")" + alias);
LOG.debug("TableMask creates `" + sb.toString() + "`");
return sb.toString();
}

void addTableMasking(ASTNode node, String replacementText) throws SemanticException {
translator.addTranslation(node, replacementText);
}

void applyTableMasking(TokenRewriteStream tokenRewriteStream) throws SemanticException {
translator.applyTranslations(tokenRewriteStream);
}

public boolean needsRewrite() {
return needsRewrite;
}

public void setNeedsRewrite(boolean needsRewrite) {
this.needsRewrite = needsRewrite;
}

}
Loading

0 comments on commit a0a5371

Please sign in to comment.