Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class MetastoreLock implements HiveLock {

private final ClientPool<IMetaStoreClient, TException> metaClients;

private final String catalogName;
private final String databaseName;
private final String tableName;
private final String fullName;
Expand All @@ -100,6 +101,7 @@ public MetastoreLock(Configuration conf, ClientPool<IMetaStoreClient, TException
String catalogName, String databaseName, String tableName) {
this.metaClients = metaClients;
this.fullName = catalogName + "." + databaseName + "." + tableName;
this.catalogName = catalogName;
this.databaseName = databaseName;
this.tableName = tableName;

Expand Down Expand Up @@ -277,6 +279,7 @@ private LockInfo createLock() throws LockException {

LockComponent lockComponent =
new LockComponent(LockType.EXCL_WRITE, LockLevel.TABLE, databaseName);
lockComponent.setCatName(catalogName);
lockComponent.setTablename(tableName);
LockRequest lockRequest =
new LockRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConfForTest;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class BaseReplicationScenariosAcidTables {

protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
private static final Path REPLICA_EXTERNAL_BASE = new Path("/replica_external_base");
protected static final String PRIMARY_CAT_NAME = Warehouse.DEFAULT_CATALOG_NAME;
protected static String fullyQualifiedReplicaExternalBase;
static WarehouseInstance primary;
static WarehouseInstance replica, replicaNonAcid;
Expand Down Expand Up @@ -348,8 +350,8 @@ List<Long> openTxns(int numTxns, TxnStore txnHandler, HiveConf primaryConf) thro
return txns;
}

List<Long> allocateWriteIdsForTablesAndAcquireLocks(String primaryDbName, Map<String, Long> tables,
TxnStore txnHandler,
List<Long> allocateWriteIdsForTablesAndAcquireLocks(String primaryCatName, String primaryDbName,
Map<String, Long> tables, TxnStore txnHandler,
List<Long> txns, HiveConf primaryConf) throws Throwable {
AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest();
rqst.setDbName(primaryDbName);
Expand All @@ -361,6 +363,7 @@ List<Long> allocateWriteIdsForTablesAndAcquireLocks(String primaryDbName, Map<St
for (long txnId : txns) {
LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE,
primaryDbName);
comp.setCatName(primaryCatName);
comp.setTablename(entry.getKey());
comp.setOperationType(DataOperationType.UPDATE);
List<LockComponent> components = new ArrayList<LockComponent>(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ public void testReverseBootstrap() throws Throwable {
Map<String, Long> tablesInSecDb = new HashMap<>();
tablesInSecDb.put("t1", (long) numTxnsForSecDb + 4);
tablesInSecDb.put("t2", (long) numTxnsForSecDb + 4);
List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName + "_extra",
tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
tearDownLockIds.addAll(lockIdsForSecDb);

Expand All @@ -576,8 +576,8 @@ public void testReverseBootstrap() throws Throwable {
Map<String, Long> tablesInSourceDb = new HashMap<>();
tablesInSourceDb.put("t1", (long) numTxnsForPrimaryDb + 6);
tablesInSourceDb.put("t2", (long) numTxnsForPrimaryDb);
List<Long> lockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(replicatedDbName, tablesInSourceDb, txnHandler,
txnsForSourceDb, replica.getConf());
List<Long> lockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, replicatedDbName,
tablesInSourceDb, txnHandler, txnsForSourceDb, replica.getConf());
tearDownLockIds.addAll(lockIdsForSourceDb);

//Open 1 txn with no hive locks acquired
Expand Down Expand Up @@ -1092,7 +1092,7 @@ private List<String> setUpFirstIterForOptimisedBootstrap() throws Throwable {
Map<String, Long> tablesInSecDb = new HashMap<>();
tablesInSecDb.put("t1", (long) numTxnsForSecDb);
tablesInSecDb.put("t2", (long) numTxnsForSecDb);
List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName + "_extra",
tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
tearDownLockIds.addAll(lockIdsForSecDb);

Expand All @@ -1105,8 +1105,8 @@ private List<String> setUpFirstIterForOptimisedBootstrap() throws Throwable {
Map<String, Long> tablesInSourceDb = new HashMap<>();
tablesInSourceDb.put("t1", (long) numTxnsForPrimaryDb);
tablesInSourceDb.put("t5", (long) numTxnsForPrimaryDb);
List<Long> lockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName, tablesInSourceDb, txnHandler,
txnsForSourceDb, primary.getConf());
List<Long> lockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName,
tablesInSourceDb, txnHandler, txnsForSourceDb, primary.getConf());
tearDownLockIds.addAll(lockIdsForSourceDb);

//Open 1 txn with no hive locks acquired
Expand Down Expand Up @@ -1157,7 +1157,7 @@ private List<String> setUpFirstIterForOptimisedBootstrap() throws Throwable {
Map<String, Long> newTablesForSecDb = new HashMap<>();
newTablesForSecDb.put("t1", (long) numTxnsForSecDb + 1);
newTablesForSecDb.put("t2", (long) numTxnsForSecDb + 1);
List<Long> newLockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
List<Long> newLockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName + "_extra",
newTablesForSecDb, txnHandler, newTxnsForSecDb, primaryConf);
tearDownLockIds.addAll(newLockIdsForSecDb);

Expand All @@ -1169,8 +1169,8 @@ private List<String> setUpFirstIterForOptimisedBootstrap() throws Throwable {
Map<String, Long> newTablesInSourceDb = new HashMap<>();
newTablesInSourceDb.put("t1", (long) 5);
newTablesInSourceDb.put("t5", (long) 3);
List<Long> newLockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName, newTablesInSourceDb, txnHandler,
newTxnsForSourceDb, primary.getConf());
List<Long> newLockIdsForSourceDb = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName,
newTablesInSourceDb, txnHandler, newTxnsForSourceDb, primary.getConf());
tearDownLockIds.addAll(newLockIdsForSourceDb);

//Open 1 txn with no hive locks acquired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ public void testCompleteFailoverWithReverseBootstrap() throws Throwable {
Map<String, Long> tablesInSecDb = new HashMap<>();
tablesInSecDb.put("t1", (long) numTxnsForSecDb);
tablesInSecDb.put("t2", (long) numTxnsForSecDb);
List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName + "_extra",
tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);

//Open 2 txns for Primary Db
Expand All @@ -536,7 +536,7 @@ public void testCompleteFailoverWithReverseBootstrap() throws Throwable {
Map<String, Long> tablesInPrimaryDb = new HashMap<>();
tablesInPrimaryDb.put("t1", (long) numTxnsForPrimaryDb + 1);
tablesInPrimaryDb.put("t2", (long) numTxnsForPrimaryDb + 2);
List<Long> lockIdsForPrimaryDb = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName,
List<Long> lockIdsForPrimaryDb = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName,
tablesInPrimaryDb, txnHandler, txnsForPrimaryDb, primaryConf);

//Open 1 txn with no hive locks acquired
Expand Down Expand Up @@ -1508,7 +1508,8 @@ public void testAcidTablesBootstrapWithOpenTxnsTimeout() throws Throwable {
Map<String, Long> tables = new HashMap<>();
tables.put("t1", numTxns + 1L);
tables.put("t2", numTxns + 2L);
List<Long> lockIds = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName, tables, txnHandler, txns, primaryConf);
List<Long> lockIds = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName, tables,
txnHandler, txns, primaryConf);

// Bootstrap dump with open txn timeout as 1s.
List<String> withConfigs = Arrays.asList(
Expand Down Expand Up @@ -1627,7 +1628,7 @@ public void testAcidTablesBootstrapWithOpenTxnsDiffDb() throws Throwable {
Map<String, Long> tablesInSecDb = new HashMap<>();
tablesInSecDb.put("t1", (long) numTxns);
tablesInSecDb.put("t2", (long) numTxns);
List<Long> lockIds = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
List<Long> lockIds = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName + "_extra",
tablesInSecDb, txnHandler, txns, primaryConf);

// Bootstrap dump with open txn timeout as 300s.
Expand Down Expand Up @@ -1723,7 +1724,7 @@ public void testAcidTablesBootstrapWithOpenTxnsWaitingForLock() throws Throwable
Map<String, Long> tablesInSecDb = new HashMap<>();
tablesInSecDb.put("t1", (long) numTxns);
tablesInSecDb.put("t2", (long) numTxns);
List<Long> lockIds = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
List<Long> lockIds = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName + "_extra",
tablesInSecDb, txnHandler, txns, primaryConf);

WarehouseInstance.Tuple bootstrapDump = primary
Expand Down Expand Up @@ -1789,14 +1790,14 @@ public void testAcidTablesBootstrapWithOpenTxnsPrimaryAndSecondaryDb() throws Th
Map<String, Long> tablesInSecDb = new HashMap<>();
tablesInSecDb.put("t1", (long) numTxns);
tablesInSecDb.put("t2", (long) numTxns);
List<Long> lockIds = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName + "_extra",
List<Long> lockIds = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName + "_extra",
tablesInSecDb, txnHandler, txns, primaryConf);
// Allocate write ids for both tables of primary db for all txns
// t1=5+1L and t2=5+2L inserts
Map<String, Long> tablesInPrimDb = new HashMap<>();
tablesInPrimDb.put("t1", (long) numTxns + 1L);
tablesInPrimDb.put("t2", (long) numTxns + 2L);
lockIds.addAll(allocateWriteIdsForTablesAndAcquireLocks(primaryDbName,
lockIds.addAll(allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName,
tablesInPrimDb, txnHandler, txnsSameDb, primaryConf));

// Bootstrap dump with open txn timeout as 1s.
Expand Down Expand Up @@ -1864,7 +1865,8 @@ public void testAcidTablesBootstrapWithOpenTxnsAbortDisabled() throws Throwable
Map<String, Long> tables = new HashMap<>();
tables.put("t1", numTxns + 1L);
tables.put("t2", numTxns + 2L);
List<Long> lockIds = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName, tables, txnHandler, txns, primaryConf);
List<Long> lockIds = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName, tables,
txnHandler, txns, primaryConf);

// Bootstrap dump with open txn timeout as 1s.
List<String> withConfigs = Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ public void testAcidTablesBootstrapDuringIncrementalWithOpenTxnsTimeout() throws
Map<String, Long> tables = new HashMap<>();
tables.put("t1", numTxns+2L);
tables.put("t2", numTxns+6L);
List<Long> lockIds = allocateWriteIdsForTablesAndAcquireLocks(primaryDbName, tables, txnHandler, txns, primaryConf);
List<Long> lockIds = allocateWriteIdsForTablesAndAcquireLocks(PRIMARY_CAT_NAME, primaryDbName, tables,
txnHandler, txns, primaryConf);

// Bootstrap dump with open txn timeout as 1s.
List<String> withConfigs = new LinkedList<>(dumpWithAcidBootstrapClause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private void assertResult(List<String> expected, List<String> actual) {
public void testMaterializationLockCleaned() throws Exception {
TxnStore txnHandler = TxnUtils.getTxnStore(conf);
OpenTxnsResponse response = txnHandler.openTxns(new OpenTxnRequest(1, "user", "host"));
txnHandler.lockMaterializationRebuild("default", TABLE1, response.getTxn_ids().get(0));
txnHandler.lockMaterializationRebuild("hive", "default", TABLE1, response.getTxn_ids().get(0));

//Mimic the lock can be cleaned up
ValidTxnList validTxnList = Mockito.mock(ValidReadTxnList.class);
Expand Down
4 changes: 2 additions & 2 deletions parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,7 @@ inputFileFormat
tabTypeExpr
@init { pushMsg("specifying table types", state); }
@after { popMsg(state); }
: identifier (DOT^ identifier)?
: identifier (DOT^ identifier (DOT^ identifier)?)?
(identifier (DOT^
(
(KW_ELEM_TYPE) => KW_ELEM_TYPE
Expand Down Expand Up @@ -1346,7 +1346,7 @@ showStatement
| KW_SHOW KW_TBLPROPERTIES tableName (LPAREN prptyName=StringLiteral RPAREN)? -> ^(TOK_SHOW_TBLPROPERTIES tableName $prptyName?)
| KW_SHOW KW_LOCKS
(
(KW_DATABASE|KW_SCHEMA) => (KW_DATABASE|KW_SCHEMA) (dbName=identifier) (isExtended=KW_EXTENDED)? -> ^(TOK_SHOWDBLOCKS $dbName $isExtended?)
(KW_DATABASE|KW_SCHEMA) => (KW_DATABASE|KW_SCHEMA) (name=databaseName) (isExtended=KW_EXTENDED)? -> ^(TOK_SHOWDBLOCKS $name $isExtended?)
|
(parttype=partTypeExpr)? (isExtended=KW_EXTENDED)? -> ^(TOK_SHOWLOCKS $parttype? $isExtended?)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@

import java.util.Map;

import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.ddl.DDLSemanticAnalyzerFactory.DDLType;
import org.apache.hadoop.hive.ql.ddl.DDLUtils;
import org.apache.hadoop.hive.ql.ddl.DDLWork;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.session.SessionState;

/**
* Analyzer for show locks commands.
Expand All @@ -44,29 +49,51 @@ public ShowLocksAnalyzer(QueryState queryState) throws SemanticException {
public void analyzeInternal(ASTNode root) throws SemanticException {
ctx.setResFile(ctx.getLocalTmpPath());

String tableName = null;
String fullyQualifiedTableName = null;
Map<String, String> partitionSpec = null;
boolean isExtended = false;
if (root.getChildCount() >= 1) {
// table for which show locks is being executed
for (int i = 0; i < root.getChildCount(); i++) {
ASTNode child = (ASTNode) root.getChild(i);
if (child.getType() == HiveParser.TOK_TABTYPE) {
tableName = DDLUtils.getFQName((ASTNode) child.getChild(0));
fullyQualifiedTableName = DDLUtils.getFQName((ASTNode) child.getChild(0));
// get partition metadata if partition specified
if (child.getChildCount() == 2) {
ASTNode partitionSpecNode = (ASTNode) child.getChild(1);
partitionSpec = getValidatedPartSpec(getTable(tableName), partitionSpecNode, conf, false);
partitionSpec = getValidatedPartSpec(getTable(fullyQualifiedTableName), partitionSpecNode, conf, false);
}
} else if (child.getType() == HiveParser.KW_EXTENDED) {
isExtended = true;
}
}
}

String catalogName = null;
String dbName = null;
String tableName = null;

if (fullyQualifiedTableName != null) {
String defaultDatabase = SessionState.get() != null
? SessionState.get().getCurrentDatabase()
: Warehouse.DEFAULT_DATABASE_NAME;
TableName fullyQualifiedTableNameObject = TableName.fromString(fullyQualifiedTableName,
HiveUtils.getCurrentCatalogOrDefault(conf), defaultDatabase);
catalogName = fullyQualifiedTableNameObject.getCat();
dbName = fullyQualifiedTableNameObject.getDb();
tableName = fullyQualifiedTableNameObject.getTable();

if (getCatalog(catalogName) == null) {
throw new SemanticException(ErrorMsg.CATALOG_NOT_EXISTS, catalogName);
} else if (getDatabase(catalogName, dbName, true) == null) {
throw new SemanticException(ErrorMsg.DATABASE_NOT_EXISTS);
}
}

assert txnManager != null : "Transaction manager should be set before calling analyze";
ShowLocksDesc desc =
new ShowLocksDesc(ctx.getResFile(), tableName, partitionSpec, isExtended, txnManager.useNewShowLocksFormat());
new ShowLocksDesc(ctx.getResFile(), catalogName, dbName, tableName, partitionSpec,
isExtended, txnManager.useNewShowLocksFormat());
Task<DDLWork> task = TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc));
rootTasks.add(task);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public class ShowLocksDesc implements DDLDesc, Serializable {
private static final String OLD_FORMAT_SCHEMA = "name,mode#string:string";
private static final String OLD_TBL_FORMAT_SCHEMA = "tab_name,mode#string:string";
private static final String OLD_DB_FORMAT_SCHEMA = "db_name,mode#string:string";
private static final String NEW_FORMAT_SCHEMA = "lockid,database,table,partition,lock_state," +
private static final String NEW_FORMAT_SCHEMA = "lockid,catalog,database,table,partition,lock_state," +
"blocked_by,lock_type,transaction_id,last_heartbeat,acquired_at,user,hostname,agent_info#" +
"string:string:string:string:string:string:string:string:string:string:string:string:string";
"string:string:string:string:string:string:string:string:string:string:string:string:string:string";

private final String resFile;
private final String catName;
Expand All @@ -58,11 +58,11 @@ public ShowLocksDesc(Path resFile, String catName, String dbName, boolean isExt,
this.isNewFormat = isNewFormat;
}

public ShowLocksDesc(Path resFile, String tableName, Map<String, String> partSpec, boolean isExt,
boolean isNewFormat) {
public ShowLocksDesc(Path resFile, String catName, String dbName, String tableName,
Map<String, String> partSpec, boolean isExt, boolean isNewFormat) {
this.resFile = resFile.toString();
this.catName = null;
this.dbName = null;
this.catName = catName;
this.dbName = dbName;
this.tableName = tableName;
this.partSpec = partSpec;
this.isExt = isExt;
Expand Down
Loading