Skip to content

[SPARK-18372][SQL][Branch-1.6].Staging directory fail to be removed #15819

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,33 @@

package org.apache.spark.sql.hive.execution

import java.io.IOException
import java.net.URI
import java.text.SimpleDateFormat
import java.util
import java.util.{Date, Random}

import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils

import scala.util.control.NonFatal

import scala.collection.JavaConverters._
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.exec.TaskRunner
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.types.DataType
Expand All @@ -54,6 +63,63 @@ case class InsertIntoHiveTable(
@transient private lazy val hiveContext = new Context(sc.hiveconf)
@transient private lazy val catalog = sc.catalog

@transient var createdTempDir: Option[Path] = None
val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR)

private def executionId: String = {
val rand: Random = new Random
val format: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS")
val executionId: String = "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why all this -- just us a UUID? you also have a redundant return and types here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it is. I am working on this way because I want to code is exactly the same as the spark 2.0.x version.

executionId
}

private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = {
val inputPathUri: URI = inputPath.toUri
val inputPathName: String = inputPathUri.getPath
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
val stagingPathName: String =
if (inputPathName.indexOf(stagingDir) == -1) {
new Path(inputPathName, stagingDir).toString
} else {
inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
}
val dir: Path =
fs.makeQualified(
new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID))
logDebug("Created staging dir = " + dir + " for path = " + inputPath)
try {
if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'")
}
createdTempDir = Some(dir)
fs.deleteOnExit(dir)
}
catch {
case e: IOException =>
throw new RuntimeException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use RuntimeException; why even handle this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can find the reason that we use this code is because (1) the old version need to use the hive package to create the staging directory, in the hive code, this staging directory is storied in a hash map, and then these staging directories would be removed when the session is closed. however, our spark code do not trigger the hive session close, then, these directories will not be removed. (2) you can find the pushed code just simulate the hive way to create the staging directory inside the spark rather than based on the hive. Then, the staging directory will be removed. (3) I will fix the return type issue, thanks for your comments @srowen

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost all the codes in this PR are copied from the existing master. This PR is just for branch 1.6

"Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)

}
return dir
Copy link
Contributor

@fidato13 fidato13 Nov 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the return statement in scala code be removed please.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for your comment, I will update this push it again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

}

private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = {
getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf)
}

def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = {
val extURI: URI = path.toUri
if (extURI.getScheme == "viewfs") {
getExtTmpPathRelTo(path.getParent, hadoopConf)
} else {
new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000")
}
}

def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = {
new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000
}

private def newSerializer(tableDesc: TableDesc): Serializer = {
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
serializer.initialize(null, tableDesc.getProperties)
Expand Down Expand Up @@ -129,7 +195,9 @@ case class InsertIntoHiveTable(
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = table.tableDesc
val tableLocation = table.hiveQlTable.getDataLocation
val tmpLocation = hiveContext.getExternalTmpPath(tableLocation)
val jobConf = new JobConf(sc.hiveconf)
val tmpLocation = getExternalTmpPath(tableLocation, jobConf)

val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
val isCompressed = sc.hiveconf.getBoolean(
ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)
Expand Down Expand Up @@ -175,7 +243,6 @@ case class InsertIntoHiveTable(
}
}

val jobConf = new JobConf(sc.hiveconf)
val jobConfSer = new SerializableJobConf(jobConf)

// When speculation is on and output committer class name contains "Direct", we should warn
Expand Down Expand Up @@ -260,6 +327,15 @@ case class InsertIntoHiveTable(
holdDDLTime)
}

// Attempt to delete the staging directory and the inclusive files. If failed, the files are
// expected to be dropped at the normal termination of VM since deleteOnExit is used.
try {
createdTempDir.foreach { path => path.getFileSystem(jobConf).delete(path, true) }
} catch {
case NonFatal(e) =>
logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
}

// Invalidate the cache.
sqlContext.cacheManager.invalidateCache(table)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@ package org.apache.spark.sql.hive.client
import java.io.File

import org.apache.hadoop.util.VersionInfo

import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{Logging, SparkFunSuite}
import org.apache.spark.sql.catalyst.expressions.{NamedExpression, Literal, AttributeReference, EqualTo}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.tags.ExtendedHiveTest
import org.apache.spark.util.Utils
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.hive.test.TestHiveSingleton


/**
* A simple set of tests that call the methods of a hive ClientInterface, loading different version
Expand All @@ -36,7 +39,7 @@ import org.apache.spark.util.Utils
* is not fully tested.
*/
@ExtendedHiveTest
class VersionsSuite extends SparkFunSuite with Logging {
class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingleton with Logging {

// In order to speed up test execution during development or in Jenkins, you can specify the path
// of an existing Ivy cache:
Expand Down Expand Up @@ -216,5 +219,36 @@ class VersionsSuite extends SparkFunSuite with Logging {
"as 'COMPACT' WITH DEFERRED REBUILD")
client.reset()
}

test(s"$version: CREATE TABLE AS SELECT") {
withTable("tbl") {
sqlContext.sql("CREATE TABLE tbl AS SELECT 1 AS a")
assert(sqlContext.table("tbl").collect().toSeq == Seq(Row(1)))
}
}

test(s"$version: Delete the temporary staging directory and files after each insert") {
withTempDir { tmpDir =>
withTable("tab", "tbl") {
sqlContext.sql(
s"""
|CREATE TABLE tab(c1 string)
|location '${tmpDir.toURI.toString}'
""".stripMargin)

sqlContext.sql("CREATE TABLE tbl AS SELECT 1 AS a")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can create a temporary view, instead of creating another table.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the temporary view supported in the 1.6.x? I just used the hivecontext to create the view, but it does not work. because this is small test case, the created table here would be ok. please advise. thanks so much, Tao.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 1.6, the function is registerTempTable. The name was changed in 2.0 to temp view.

Copy link
Author

@merlintang merlintang Jan 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks Xiao, I have created a dataframe, then create registerTempTable as following.

val df = sqlContext.createDataFrame((1 to 2).map(i => (i, "a"))).toDF("key", "value")
df.select("value").repartition(1).registerTempTable("tbl")

it can work, but it looks like fuzzy. what do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about the following line?

Seq((1, "a")).toDF("key", "value").registerTempTable("tbl")

BTW, I am Xiao Li. : )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You just want one column. Then, you can do it by

Seq(Tuple1("a")).toDF("value").registerTempTable("tbl")

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry Xiao, since one of my best friend is Tao. :). Sorry. It is updated. Thanks again.

sqlContext.sql(s"INSERT OVERWRITE TABLE tab SELECT * from tbl ")

def listFiles(path: File): List[String] = {
val dir = path.listFiles()
val folders = dir.filter(_.isDirectory).toList
val filePaths = dir.map(_.getName).toList
folders.flatMap(listFiles) ++: filePaths
}
val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil
assert(listFiles(tmpDir).sorted == expectedFiles)
}
}
}
}
}