Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #6437] Fix Spark engine query result save to HDFS #6444

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

import com.google.common.annotations.VisibleForTesting
import org.apache.hadoop.fs.Path
import org.apache.spark.{ui, SparkConf}
import org.apache.spark.kyuubi.{SparkContextHelper, SparkSQLEngineEventListener, SparkSQLEngineListener}
import org.apache.spark.kyuubi.SparkUtilsHelper.getLocalDir
Expand Down Expand Up @@ -92,10 +91,9 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
}

if (backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
val path = new Path(engineSavePath)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
fs.mkdirs(path)
fs.deleteOnExit(path)
val fs = engineSavePath.getFileSystem(spark.sparkContext.hadoopConfiguration)
fs.mkdirs(engineSavePath)
fs.deleteOnExit(engineSavePath)
}
}

Expand All @@ -113,10 +111,9 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
Duration(60, TimeUnit.SECONDS))
})
try {
val path = new Path(engineSavePath)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
if (fs.exists(path)) {
fs.delete(path, true)
val fs = engineSavePath.getFileSystem(spark.sparkContext.hadoopConfiguration)
if (fs.exists(engineSavePath)) {
fs.delete(engineSavePath, true)
}
} catch {
case e: Throwable => error(s"Error cleaning engine result save path: $engineSavePath", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ExecuteStatement(
override protected def supportProgress: Boolean = true

private var fetchOrcStatement: Option[FetchOrcStatement] = None
private var saveFileName: Option[String] = None
private var saveFilePath: Option[Path] = None
override protected def resultSchema: StructType = {
if (result == null || result.schema.isEmpty) {
new StructType().add("Result", "string")
Expand All @@ -71,9 +71,8 @@ class ExecuteStatement(
override def close(): Unit = {
super.close()
fetchOrcStatement.foreach(_.close())
saveFileName.foreach { p =>
val path = new Path(p)
path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, true)
saveFilePath.foreach { p =>
p.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(p, true)
}
}

Expand Down Expand Up @@ -179,7 +178,7 @@ class ExecuteStatement(
resultSaveSizeThreshold,
resultSaveRowsThreshold,
result)) {
saveFileName =
saveFilePath =
Some(
session.sessionManager.asInstanceOf[SparkSQLSessionManager].getOperationResultSavePath(
session.handle,
Expand All @@ -190,14 +189,14 @@ class ExecuteStatement(
// df.write will introduce an extra shuffle for the outermost limit, and hurt performance
if (resultMaxRows > 0) {
result.toDF(colName: _*).limit(resultMaxRows).write
.option("compression", "zstd").format("orc").save(saveFileName.get)
.option("compression", "zstd").format("orc").save(saveFilePath.get.toString)
} else {
result.toDF(colName: _*).write
.option("compression", "zstd").format("orc").save(saveFileName.get)
.option("compression", "zstd").format("orc").save(saveFilePath.get.toString)
}
info(s"Save result to ${saveFileName.get}")
info(s"Save result to ${saveFilePath.get}")
fetchOrcStatement = Some(new FetchOrcStatement(spark))
return fetchOrcStatement.get.getIterator(saveFileName.get, resultSchema)
return fetchOrcStatement.get.getIterator(saveFilePath.get.toString, resultSchema)
}
val internalArray = if (resultMaxRows <= 0) {
info("Execute in full collect mode")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.kyuubi.engine.spark.session

import java.nio.file.Paths
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}

import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -188,10 +187,9 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
if (getSessionConf(KyuubiConf.OPERATION_RESULT_SAVE_TO_FILE, spark)) {
val sessionSavePath = getSessionResultSavePath(sessionHandle)
try {
val path = new Path(sessionSavePath)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
if (fs.exists(path)) {
fs.delete(path, true)
val fs = sessionSavePath.getFileSystem(spark.sparkContext.hadoopConfiguration)
if (fs.exists(sessionSavePath)) {
fs.delete(sessionSavePath, true)
info(s"Deleted session result path: $sessionSavePath")
}
} catch {
Expand All @@ -211,17 +209,17 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)

override protected def isServer: Boolean = false

private[spark] def getEngineResultSavePath(): String = {
Paths.get(conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR), engineId).toString
private[spark] def getEngineResultSavePath(): Path = {
new Path(conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR), engineId)
}

private def getSessionResultSavePath(sessionHandle: SessionHandle): String = {
Paths.get(getEngineResultSavePath(), sessionHandle.identifier.toString).toString
private def getSessionResultSavePath(sessionHandle: SessionHandle): Path = {
new Path(getEngineResultSavePath(), sessionHandle.identifier.toString)
}

private[spark] def getOperationResultSavePath(
sessionHandle: SessionHandle,
opHandle: OperationHandle): String = {
Paths.get(getSessionResultSavePath(sessionHandle), opHandle.identifier.toString).toString
opHandle: OperationHandle): Path = {
new Path(getSessionResultSavePath(sessionHandle), opHandle.identifier.toString)
}
}
Loading