Skip to content

Commit

Permalink
[KYUUBI #6437] Fix Spark engine query result save to HDFS
Browse files Browse the repository at this point in the history
This pull request fixes #6437

Use `org.apache.hadoop.fs.Path` instead of `java.nio.file.Paths` to avoid `OPERATION_RESULT_SAVE_TO_FILE_DIR` scheme unexpected change.

- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

Spark Job failed to start with error: `java.io.IOException: JuiceFS initialized failed for jfs:///` with conf `kyuubi.operation.result.saveToFile.dir=jfs://datalake/tmp`.

`hdfs://xxx:port/tmp` may encounter similar errors

User Can use hdfs dir as `kyuubi.operation.result.saveToFile.dir` without error.

Seems no test suites added in #5591 and #5986, I'll try to build a dist and test with our internal cluster.

---

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6444 from camper42/save-to-hdfs.

Closes #6437

990f0a7 [camper42] [Kyuubi #6437] Fix Spark engine query result save to HDFS

Authored-by: camper42 <camper.xlii@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
(cherry picked from commit 71649da)
Signed-off-by: Cheng Pan <chengpan@apache.org>
  • Loading branch information
camper42 authored and pan3793 committed Jun 4, 2024
1 parent f25e010 commit bb821d1
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

import com.google.common.annotations.VisibleForTesting
import org.apache.hadoop.fs.Path
import org.apache.spark.{ui, SparkConf}
import org.apache.spark.kyuubi.{SparkContextHelper, SparkSQLEngineEventListener, SparkSQLEngineListener}
import org.apache.spark.kyuubi.SparkUtilsHelper.getLocalDir
Expand Down Expand Up @@ -92,10 +91,9 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
}

if (backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
val path = new Path(engineSavePath)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
fs.mkdirs(path)
fs.deleteOnExit(path)
val fs = engineSavePath.getFileSystem(spark.sparkContext.hadoopConfiguration)
fs.mkdirs(engineSavePath)
fs.deleteOnExit(engineSavePath)
}
}

Expand All @@ -113,10 +111,9 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
Duration(60, TimeUnit.SECONDS))
})
try {
val path = new Path(engineSavePath)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
if (fs.exists(path)) {
fs.delete(path, true)
val fs = engineSavePath.getFileSystem(spark.sparkContext.hadoopConfiguration)
if (fs.exists(engineSavePath)) {
fs.delete(engineSavePath, true)
}
} catch {
case e: Throwable => error(s"Error cleaning engine result save path: $engineSavePath", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ExecuteStatement(
override protected def supportProgress: Boolean = true

private var fetchOrcStatement: Option[FetchOrcStatement] = None
private var saveFileName: Option[String] = None
private var saveFilePath: Option[Path] = None
override protected def resultSchema: StructType = {
if (result == null || result.schema.isEmpty) {
new StructType().add("Result", "string")
Expand All @@ -71,9 +71,8 @@ class ExecuteStatement(
override def close(): Unit = {
super.close()
fetchOrcStatement.foreach(_.close())
saveFileName.foreach { p =>
val path = new Path(p)
path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, true)
saveFilePath.foreach { p =>
p.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(p, true)
}
}

Expand Down Expand Up @@ -179,7 +178,7 @@ class ExecuteStatement(
resultSaveSizeThreshold,
resultSaveRowsThreshold,
result)) {
saveFileName =
saveFilePath =
Some(
session.sessionManager.asInstanceOf[SparkSQLSessionManager].getOperationResultSavePath(
session.handle,
Expand All @@ -191,14 +190,14 @@ class ExecuteStatement(
// df.write will introduce an extra shuffle for the outermost limit, and hurt performance
if (resultMaxRows > 0) {
result.toDF(colName: _*).limit(resultMaxRows).write
.option("compression", codec).format("orc").save(saveFileName.get)
.option("compression", codec).format("orc").save(saveFilePath.get.toString)
} else {
result.toDF(colName: _*).write
.option("compression", codec).format("orc").save(saveFileName.get)
.option("compression", codec).format("orc").save(saveFilePath.get.toString)
}
info(s"Save result to ${saveFileName.get}")
info(s"Save result to ${saveFilePath.get}")
fetchOrcStatement = Some(new FetchOrcStatement(spark))
return fetchOrcStatement.get.getIterator(saveFileName.get, resultSchema)
return fetchOrcStatement.get.getIterator(saveFilePath.get.toString, resultSchema)
}
val internalArray = if (resultMaxRows <= 0) {
info("Execute in full collect mode")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.kyuubi.engine.spark.session

import java.nio.file.Paths
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}

import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -188,10 +187,9 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
if (getSessionConf(KyuubiConf.OPERATION_RESULT_SAVE_TO_FILE, spark)) {
val sessionSavePath = getSessionResultSavePath(sessionHandle)
try {
val path = new Path(sessionSavePath)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
if (fs.exists(path)) {
fs.delete(path, true)
val fs = sessionSavePath.getFileSystem(spark.sparkContext.hadoopConfiguration)
if (fs.exists(sessionSavePath)) {
fs.delete(sessionSavePath, true)
info(s"Deleted session result path: $sessionSavePath")
}
} catch {
Expand All @@ -211,17 +209,17 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)

override protected def isServer: Boolean = false

private[spark] def getEngineResultSavePath(): String = {
Paths.get(conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR), engineId).toString
private[spark] def getEngineResultSavePath(): Path = {
new Path(conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR), engineId)
}

private def getSessionResultSavePath(sessionHandle: SessionHandle): String = {
Paths.get(getEngineResultSavePath(), sessionHandle.identifier.toString).toString
private def getSessionResultSavePath(sessionHandle: SessionHandle): Path = {
new Path(getEngineResultSavePath(), sessionHandle.identifier.toString)
}

private[spark] def getOperationResultSavePath(
sessionHandle: SessionHandle,
opHandle: OperationHandle): String = {
Paths.get(getSessionResultSavePath(sessionHandle), opHandle.identifier.toString).toString
opHandle: OperationHandle): Path = {
new Path(getSessionResultSavePath(sessionHandle), opHandle.identifier.toString)
}
}

0 comments on commit bb821d1

Please sign in to comment.