Skip to content

Commit 5db8912

Browse files
yhuaimarmbrus
authored andcommitted
[SPARK-6618][SPARK-6669][SQL] Lock Hive metastore client correctly.
Author: Yin Huai <yhuai@databricks.com> Author: Michael Armbrust <michael@databricks.com> Closes apache#5333 from yhuai/lookupRelationLock and squashes the following commits: 59c884f [Michael Armbrust] [SQL] Lock metastore client in analyzeTable 7667030 [Yin Huai] Merge pull request #2 from marmbrus/pr/5333 e4a9b0b [Michael Armbrust] Correctly lock on MetastoreCatalog d6fc32f [Yin Huai] Missing `)`. 1e241af [Yin Huai] Protect InsertIntoHive. fee7e9c [Yin Huai] A test? 5416b0f [Yin Huai] Just protect client.
1 parent d3944b6 commit 5db8912

File tree

4 files changed

+53
-27
lines changed

4 files changed

+53
-27
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
181181
val tableFullName =
182182
relation.hiveQlTable.getDbName + "." + relation.hiveQlTable.getTableName
183183

184-
catalog.client.alterTable(tableFullName, new Table(hiveTTable))
184+
catalog.synchronized {
185+
catalog.client.alterTable(tableFullName, new Table(hiveTTable))
186+
}
185187
}
186188
case otherRelation =>
187189
throw new UnsupportedOperationException(

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
6767
val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
6868
override def load(in: QualifiedTableName): LogicalPlan = {
6969
logDebug(s"Creating new cached data source for $in")
70-
val table = synchronized {
70+
val table = HiveMetastoreCatalog.this.synchronized {
7171
client.getTable(in.database, in.name)
7272
}
7373

@@ -183,12 +183,16 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
183183

184184
def lookupRelation(
185185
tableIdentifier: Seq[String],
186-
alias: Option[String]): LogicalPlan = synchronized {
186+
alias: Option[String]): LogicalPlan = {
187187
val tableIdent = processTableIdentifier(tableIdentifier)
188188
val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
189189
hive.sessionState.getCurrentDatabase)
190190
val tblName = tableIdent.last
191-
val table = try client.getTable(databaseName, tblName) catch {
191+
val table = try {
192+
synchronized {
193+
client.getTable(databaseName, tblName)
194+
}
195+
} catch {
192196
case te: org.apache.hadoop.hive.ql.metadata.InvalidTableException =>
193197
throw new NoSuchTableException
194198
}
@@ -210,7 +214,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
210214
} else {
211215
val partitions: Seq[Partition] =
212216
if (table.isPartitioned) {
213-
HiveShim.getAllPartitionsOf(client, table).toSeq
217+
synchronized {
218+
HiveShim.getAllPartitionsOf(client, table).toSeq
219+
}
214220
} else {
215221
Nil
216222
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ case class InsertIntoHiveTable(
5050
@transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
5151
@transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
5252
@transient private lazy val hiveContext = new Context(sc.hiveconf)
53-
@transient private lazy val db = Hive.get(sc.hiveconf)
53+
@transient private lazy val catalog = sc.catalog
5454

5555
private def newSerializer(tableDesc: TableDesc): Serializer = {
5656
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
@@ -199,38 +199,45 @@ case class InsertIntoHiveTable(
199199
orderedPartitionSpec.put(entry.getName,partitionSpec.get(entry.getName).getOrElse(""))
200200
}
201201
val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec)
202-
db.validatePartitionNameCharacters(partVals)
202+
catalog.synchronized {
203+
catalog.client.validatePartitionNameCharacters(partVals)
204+
}
203205
// inheritTableSpecs is set to true. It should be set to false for a IMPORT query
204206
// which is currently considered as a Hive native command.
205207
val inheritTableSpecs = true
206208
// TODO: Correctly set isSkewedStoreAsSubdir.
207209
val isSkewedStoreAsSubdir = false
208210
if (numDynamicPartitions > 0) {
209-
db.loadDynamicPartitions(
210-
outputPath,
211-
qualifiedTableName,
212-
orderedPartitionSpec,
213-
overwrite,
214-
numDynamicPartitions,
215-
holdDDLTime,
216-
isSkewedStoreAsSubdir
217-
)
211+
catalog.synchronized {
212+
catalog.client.loadDynamicPartitions(
213+
outputPath,
214+
qualifiedTableName,
215+
orderedPartitionSpec,
216+
overwrite,
217+
numDynamicPartitions,
218+
holdDDLTime,
219+
isSkewedStoreAsSubdir)
220+
}
218221
} else {
219-
db.loadPartition(
222+
catalog.synchronized {
223+
catalog.client.loadPartition(
224+
outputPath,
225+
qualifiedTableName,
226+
orderedPartitionSpec,
227+
overwrite,
228+
holdDDLTime,
229+
inheritTableSpecs,
230+
isSkewedStoreAsSubdir)
231+
}
232+
}
233+
} else {
234+
catalog.synchronized {
235+
catalog.client.loadTable(
220236
outputPath,
221237
qualifiedTableName,
222-
orderedPartitionSpec,
223238
overwrite,
224-
holdDDLTime,
225-
inheritTableSpecs,
226-
isSkewedStoreAsSubdir)
239+
holdDDLTime)
227240
}
228-
} else {
229-
db.loadTable(
230-
outputPath,
231-
qualifiedTableName,
232-
overwrite,
233-
holdDDLTime)
234241
}
235242

236243
// Invalidate the cache.

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,4 +457,15 @@ class SQLQuerySuite extends QueryTest {
457457
dropTempTable("data")
458458
setConf("spark.sql.hive.convertCTAS", originalConf)
459459
}
460+
461+
test("sanity test for SPARK-6618") {
462+
(1 to 100).par.map { i =>
463+
val tableName = s"SPARK_6618_table_$i"
464+
sql(s"CREATE TABLE $tableName (col1 string)")
465+
catalog.lookupRelation(Seq(tableName))
466+
table(tableName)
467+
tables()
468+
sql(s"DROP TABLE $tableName")
469+
}
470+
}
460471
}

0 commit comments

Comments
 (0)