-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-18372][SQL][Branch-1.6].Staging directory fail to be removed #15819
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ac65375
e537239
8648a46
881c96b
15da7a8
4f26b28
ab5e369
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,24 +17,33 @@ | |
|
||
package org.apache.spark.sql.hive.execution | ||
|
||
import java.io.IOException | ||
import java.net.URI | ||
import java.text.SimpleDateFormat | ||
import java.util | ||
import java.util.{Date, Random} | ||
|
||
import scala.collection.JavaConverters._ | ||
import org.apache.hadoop.conf.Configuration | ||
import org.apache.hadoop.fs.{FileSystem, Path} | ||
import org.apache.hadoop.hive.common.FileUtils | ||
|
||
import scala.util.control.NonFatal | ||
|
||
import scala.collection.JavaConverters._ | ||
import org.apache.hadoop.hive.conf.HiveConf | ||
import org.apache.hadoop.hive.conf.HiveConf.ConfVars | ||
import org.apache.hadoop.hive.ql.exec.TaskRunner | ||
import org.apache.hadoop.hive.ql.plan.TableDesc | ||
import org.apache.hadoop.hive.ql.{Context, ErrorMsg} | ||
import org.apache.hadoop.hive.serde2.Serializer | ||
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption | ||
import org.apache.hadoop.hive.serde2.objectinspector._ | ||
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf} | ||
|
||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.sql.Row | ||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.catalyst.expressions.Attribute | ||
import org.apache.spark.sql.execution.{UnaryNode, SparkPlan} | ||
import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} | ||
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} | ||
import org.apache.spark.sql.hive._ | ||
import org.apache.spark.sql.types.DataType | ||
|
@@ -54,6 +63,63 @@ case class InsertIntoHiveTable( | |
@transient private lazy val hiveContext = new Context(sc.hiveconf) | ||
@transient private lazy val catalog = sc.catalog | ||
|
||
@transient var createdTempDir: Option[Path] = None | ||
val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR) | ||
|
||
private def executionId: String = { | ||
val rand: Random = new Random | ||
val format: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS") | ||
val executionId: String = "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) | ||
executionId | ||
} | ||
|
||
private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = { | ||
val inputPathUri: URI = inputPath.toUri | ||
val inputPathName: String = inputPathUri.getPath | ||
val fs: FileSystem = inputPath.getFileSystem(hadoopConf) | ||
val stagingPathName: String = | ||
if (inputPathName.indexOf(stagingDir) == -1) { | ||
new Path(inputPathName, stagingDir).toString | ||
} else { | ||
inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) | ||
} | ||
val dir: Path = | ||
fs.makeQualified( | ||
new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID)) | ||
logDebug("Created staging dir = " + dir + " for path = " + inputPath) | ||
try { | ||
if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { | ||
throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") | ||
} | ||
createdTempDir = Some(dir) | ||
fs.deleteOnExit(dir) | ||
} | ||
catch { | ||
case e: IOException => | ||
throw new RuntimeException( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't use RuntimeException; why even handle this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can find the reason that we use this code is because (1) the old version need to use the hive package to create the staging directory, in the hive code, this staging directory is storied in a hash map, and then these staging directories would be removed when the session is closed. however, our spark code do not trigger the hive session close, then, these directories will not be removed. (2) you can find the pushed code just simulate the hive way to create the staging directory inside the spark rather than based on the hive. Then, the staging directory will be removed. (3) I will fix the return type issue, thanks for your comments @srowen There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Almost all the codes in this PR are copied from the existing master. This PR is just for branch 1.6 |
||
"Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e) | ||
|
||
} | ||
return dir | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can the return statement in scala code be removed please. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for your comment, I will update this push it again. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. |
||
} | ||
|
||
private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = { | ||
getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf) | ||
} | ||
|
||
def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = { | ||
val extURI: URI = path.toUri | ||
if (extURI.getScheme == "viewfs") { | ||
getExtTmpPathRelTo(path.getParent, hadoopConf) | ||
} else { | ||
new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000") | ||
} | ||
} | ||
|
||
def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = { | ||
new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000 | ||
} | ||
|
||
private def newSerializer(tableDesc: TableDesc): Serializer = { | ||
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer] | ||
serializer.initialize(null, tableDesc.getProperties) | ||
|
@@ -129,7 +195,9 @@ case class InsertIntoHiveTable( | |
// instances within the closure, since Serializer is not serializable while TableDesc is. | ||
val tableDesc = table.tableDesc | ||
val tableLocation = table.hiveQlTable.getDataLocation | ||
val tmpLocation = hiveContext.getExternalTmpPath(tableLocation) | ||
val jobConf = new JobConf(sc.hiveconf) | ||
val tmpLocation = getExternalTmpPath(tableLocation, jobConf) | ||
|
||
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) | ||
val isCompressed = sc.hiveconf.getBoolean( | ||
ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal) | ||
|
@@ -175,7 +243,6 @@ case class InsertIntoHiveTable( | |
} | ||
} | ||
|
||
val jobConf = new JobConf(sc.hiveconf) | ||
val jobConfSer = new SerializableJobConf(jobConf) | ||
|
||
// When speculation is on and output committer class name contains "Direct", we should warn | ||
|
@@ -260,6 +327,15 @@ case class InsertIntoHiveTable( | |
holdDDLTime) | ||
} | ||
|
||
// Attempt to delete the staging directory and the inclusive files. If failed, the files are | ||
// expected to be dropped at the normal termination of VM since deleteOnExit is used. | ||
try { | ||
createdTempDir.foreach { path => path.getFileSystem(jobConf).delete(path, true) } | ||
} catch { | ||
case NonFatal(e) => | ||
logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) | ||
} | ||
|
||
// Invalidate the cache. | ||
sqlContext.cacheManager.invalidateCache(table) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,14 +20,17 @@ package org.apache.spark.sql.hive.client | |
import java.io.File | ||
|
||
import org.apache.hadoop.util.VersionInfo | ||
|
||
import org.apache.spark.sql.Row | ||
import org.apache.spark.sql.hive.HiveContext | ||
import org.apache.spark.{Logging, SparkFunSuite} | ||
import org.apache.spark.sql.catalyst.expressions.{NamedExpression, Literal, AttributeReference, EqualTo} | ||
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression} | ||
import org.apache.spark.sql.catalyst.util.quietly | ||
import org.apache.spark.sql.types.IntegerType | ||
import org.apache.spark.tags.ExtendedHiveTest | ||
import org.apache.spark.util.Utils | ||
import org.apache.spark.sql.test.SQLTestUtils | ||
import org.apache.spark.sql.hive.test.TestHiveSingleton | ||
|
||
|
||
/** | ||
* A simple set of tests that call the methods of a hive ClientInterface, loading different version | ||
|
@@ -36,7 +39,7 @@ import org.apache.spark.util.Utils | |
* is not fully tested. | ||
*/ | ||
@ExtendedHiveTest | ||
class VersionsSuite extends SparkFunSuite with Logging { | ||
class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingleton with Logging { | ||
|
||
// In order to speed up test execution during development or in Jenkins, you can specify the path | ||
// of an existing Ivy cache: | ||
|
@@ -216,5 +219,36 @@ class VersionsSuite extends SparkFunSuite with Logging { | |
"as 'COMPACT' WITH DEFERRED REBUILD") | ||
client.reset() | ||
} | ||
|
||
test(s"$version: CREATE TABLE AS SELECT") { | ||
withTable("tbl") { | ||
sqlContext.sql("CREATE TABLE tbl AS SELECT 1 AS a") | ||
assert(sqlContext.table("tbl").collect().toSeq == Seq(Row(1))) | ||
} | ||
} | ||
|
||
test(s"$version: Delete the temporary staging directory and files after each insert") { | ||
withTempDir { tmpDir => | ||
withTable("tab", "tbl") { | ||
sqlContext.sql( | ||
s""" | ||
|CREATE TABLE tab(c1 string) | ||
|location '${tmpDir.toURI.toString}' | ||
""".stripMargin) | ||
|
||
sqlContext.sql("CREATE TABLE tbl AS SELECT 1 AS a") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can create a temporary view, instead of creating another table. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does the temporary view supported in the 1.6.x? I just used the hivecontext to create the view, but it does not work. because this is small test case, the created table here would be ok. please advise. thanks so much, Tao. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In 1.6, the function is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks Xiao, I have created a dataframe, then create registerTempTable as following. val df = sqlContext.createDataFrame((1 to 2).map(i => (i, "a"))).toDF("key", "value") it can work, but it looks like fuzzy. what do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about the following line? Seq((1, "a")).toDF("key", "value").registerTempTable("tbl") BTW, I am Xiao Li. : ) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You just want one column. Then, you can do it by Seq(Tuple1("a")).toDF("value").registerTempTable("tbl") There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry Xiao, since one of my best friend is Tao. :). Sorry. It is updated. Thanks again. |
||
sqlContext.sql(s"INSERT OVERWRITE TABLE tab SELECT * from tbl ") | ||
|
||
def listFiles(path: File): List[String] = { | ||
val dir = path.listFiles() | ||
val folders = dir.filter(_.isDirectory).toList | ||
val filePaths = dir.map(_.getName).toList | ||
folders.flatMap(listFiles) ++: filePaths | ||
} | ||
val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil | ||
assert(listFiles(tmpDir).sorted == expectedFiles) | ||
} | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why all this -- just us a UUID? you also have a redundant return and types here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it is. I am working on this way because I want to code is exactly the same as the spark 2.0.x version.