Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #5377] Spark engine query result save to file #5591

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,9 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.operation.result.arrow.timestampAsString | false | When true, arrow-based rowsets will convert columns of type timestamp to strings for transmission. | boolean | 1.7.0 |
| kyuubi.operation.result.format | thrift | Specify the result format, available configs are: <ul> <li>THRIFT: the result will convert to TRow at the engine driver side. </li> <li>ARROW: the result will be encoded as Arrow at the executor side before collecting by the driver, and deserialized at the client side. note that it only takes effect for kyuubi-hive-jdbc clients now.</li></ul> | string | 1.7.0 |
| kyuubi.operation.result.max.rows | 0 | Max rows of Spark query results. Rows exceeding the limit would be ignored. By setting this value to 0 to disable the max rows limit. | int | 1.6.0 |
| kyuubi.operation.result.saveToFile.dir | /tmp/kyuubi/tmp_kyuubi_result | The Spark query result save dir, it should be a public accessible to every engine. Results are saved in ORC format, and the directory structure is `/OPERATION_RESULT_SAVE_TO_FILE_DIR/engineId/sessionId/statementId`. Each query result will delete when query finished. | string | 1.9.0 |
| kyuubi.operation.result.saveToFile.enabled | false | The switch for Spark query result save to file. | boolean | 1.9.0 |
| kyuubi.operation.result.saveToFile.minSize | 209715200 | The minSize of Spark result save to file, default value is 200 MB.we use spark's `EstimationUtils#getSizePerRowestimate` to estimate the output size of the execution plan. | long | 1.9.0 |
| kyuubi.operation.scheduler.pool | &lt;undefined&gt; | The scheduler pool of job. Note that, this config should be used after changing Spark config spark.scheduler.mode=FAIR. | string | 1.1.1 |
| kyuubi.operation.spark.listener.enabled | true | When set to true, Spark engine registers an SQLOperationListener before executing the statement, logging a few summary statistics when each stage completes. | boolean | 1.6.0 |
| kyuubi.operation.status.polling.timeout | PT5S | Timeout(ms) for long polling asynchronous running sql query's status | duration | 1.0.0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

import com.google.common.annotations.VisibleForTesting
import org.apache.hadoop.fs.Path
import org.apache.spark.{ui, SparkConf}
import org.apache.spark.kyuubi.{SparkContextHelper, SparkSQLEngineEventListener, SparkSQLEngineListener}
import org.apache.spark.kyuubi.SparkUtilsHelper.getLocalDir
Expand All @@ -37,6 +38,7 @@ import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_SUBMIT_TIME_KEY, KYUUBI_ENGINE_URL}
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.engineId
import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch, currentEngine}
import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, SparkEventHandlerRegister}
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
Expand All @@ -58,6 +60,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin

@volatile private var lifetimeTerminatingChecker: Option[ScheduledExecutorService] = None
@volatile private var stopEngineExec: Option[ThreadPoolExecutor] = None
@volatile private var engineSavePath: Option[String] = None

override def initialize(conf: KyuubiConf): Unit = {
val listener = new SparkSQLEngineListener(this)
Expand Down Expand Up @@ -87,6 +90,15 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
maxInitTimeout > 0) {
startFastFailChecker(maxInitTimeout)
}

if (backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
val savePath = backendService.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)
engineSavePath = Some(s"$savePath/$engineId")
val path = new Path(engineSavePath.get)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
fs.mkdirs(path)
fs.deleteOnExit(path)
}
}

override def stop(): Unit = if (shutdown.compareAndSet(false, true)) {
Expand All @@ -102,6 +114,10 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
exec,
Duration(60, TimeUnit.SECONDS))
})
engineSavePath.foreach { p =>
cxzl25 marked this conversation as resolved.
Show resolved Hide resolved
val path = new Path(p)
path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, true)
}
}

def gracefulStop(): Unit = if (gracefulStopDeregistered.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ package org.apache.kyuubi.engine.spark.operation

import java.util.concurrent.RejectedExecutionException

import scala.Array._
import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.kyuubi.SparkDatasetHelper._
import org.apache.spark.sql.types._

import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf.OPERATION_RESULT_MAX_ROWS
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_DIR, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator, IterableFetchIterator, OperationHandle, OperationState}
Expand All @@ -46,6 +48,8 @@ class ExecuteStatement(
override def getOperationLog: Option[OperationLog] = Option(operationLog)
override protected def supportProgress: Boolean = true

private var fetchOrcStatement: Option[FetchOrcStatement] = None
private var saveFileName: Option[String] = None
override protected def resultSchema: StructType = {
if (result == null || result.schema.isEmpty) {
new StructType().add("Result", "string")
Expand All @@ -64,6 +68,15 @@ class ExecuteStatement(
OperationLog.removeCurrentOperationLog()
}

override def close(): Unit = {
super.close()
fetchOrcStatement.foreach(_.close())
saveFileName.foreach { p =>
val path = new Path(p)
path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, true)
}
}

protected def incrementalCollectResult(resultDF: DataFrame): Iterator[Any] = {
resultDF.toLocalIterator().asScala
}
Expand Down Expand Up @@ -158,6 +171,29 @@ class ExecuteStatement(
override def iterator: Iterator[Any] = incrementalCollectResult(resultDF)
})
} else {
val resultSaveEnabled = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE, spark)
lazy val resultSaveThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark)
if (hasResultSet && resultSaveEnabled && shouldSaveResultToFs(
resultMaxRows,
resultSaveThreshold,
result)) {
val sessionId = session.handle.identifier.toString
val savePath = session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)
saveFileName = Some(s"$savePath/$engineId/$sessionId/$statementId")
// Rename all col name to avoid duplicate columns
val colName = range(0, result.schema.size).map(x => "col" + x)
// df.write will introduce an extra shuffle for the outermost limit, and hurt performance
if (resultMaxRows > 0) {
result.toDF(colName: _*).limit(resultMaxRows).write
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use resultDF instead of result. Also, is toDF(colName: _*) necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the result has duplicate columns, we can not write it to file, so we rename all col name to avoid this case

spark.sql("select 1 as a,2 as a").write("/filepath")

org.apache.spark.sql.AnalysisException: Found duplicate column(s) when inserting into hdfs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lsm1 let's add such information to the comments

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is another known issue as I mentioned in the issue comment

directly call df.write will introduce an extra shuffle for the outermost limit, and hurt performance

I think we should also add this known issue to the comment and create a new ticket to track this issue.

.option("compression", "zstd").format("orc").save(saveFileName.get)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, the zstd implementation of ORC is powered by aircompressor, I'm not sure if it's powerful as zstd-jni does. Anyway, it's internal implementation detail, we can change it latter if we have better solution

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also make the written format configurable?

Copy link
Member

@pan3793 pan3793 Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to keep it internally, it may be out of control if we expose the internal implementation early

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[SPARK-33978][SQL] Support ZSTD compression in ORC data source
https://issues.apache.org/jira/browse/SPARK-33978
Fix Version/s: 3.2.0

Maybe we need a configuration item, or the Spark version less than 3.2.0 is compressed with zlib.

3.1.1 bin/spark-shell

scala> spark.range(10).write.option("compression", "zstd").orc("/tmp/zstd")
java.lang.IllegalArgumentException: Codec [zstd] is not available. Available codecs are uncompressed, lzo, snappy, zlib, none.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it's time to have a discussion for dropping support of Spark 3.1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😅we are still using spark 3.1, It need time to migrate to new spark version

} else {
result.toDF(colName: _*).write
.option("compression", "zstd").format("orc").save(saveFileName.get)
}
info(s"Save result to $saveFileName")
fetchOrcStatement = Some(new FetchOrcStatement(spark))
return fetchOrcStatement.get.getIterator(saveFileName.get, resultSchema)
}
val internalArray = if (resultMaxRows <= 0) {
info("Execute in full collect mode")
fullCollectResult(resultDF)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.spark.operation

import scala.Array._
import scala.collection.mutable.ListBuffer

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{LocatedFileStatus, Path}
import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.orc.mapred.OrcStruct
import org.apache.orc.mapreduce.OrcInputFormat
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources.RecordReaderIterator
import org.apache.spark.sql.execution.datasources.orc.OrcDeserializer
import org.apache.spark.sql.types.StructType

import org.apache.kyuubi.KyuubiException
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION
import org.apache.kyuubi.operation.{FetchIterator, IterableFetchIterator}
import org.apache.kyuubi.util.reflect.DynConstructors

class FetchOrcStatement(spark: SparkSession) {

var orcIter: OrcFileIterator = _
def getIterator(path: String, orcSchema: StructType): FetchIterator[Row] = {
val conf = spark.sparkContext.hadoopConfiguration
val savePath = new Path(path)
val fsIterator = savePath.getFileSystem(conf).listFiles(savePath, false)
val list = new ListBuffer[LocatedFileStatus]
while (fsIterator.hasNext) {
val file = fsIterator.next()
if (file.getPath.getName.endsWith(".orc") && file.getLen > 0) {
list += file
}
}
val toRowConverter: InternalRow => Row = {
CatalystTypeConverters.createToScalaConverter(orcSchema)
.asInstanceOf[InternalRow => Row]
}
val colId = range(0, orcSchema.size)
val fullSchema = orcSchema.map(f =>
AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
val deserializer = getOrcDeserializer(orcSchema, colId)
orcIter = new OrcFileIterator(list)
val iterRow = orcIter.map(value =>
unsafeProjection(deserializer.deserialize(value)))
.map(value => toRowConverter(value))
new IterableFetchIterator[Row](iterRow.toIterable)
}

def close(): Unit = {
orcIter.close()
}

private def getOrcDeserializer(orcSchema: StructType, colId: Array[Int]): OrcDeserializer = {
try {
if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") {
// SPARK-34535 changed the constructor signature of OrcDeserializer
DynConstructors.builder()
.impl(classOf[OrcDeserializer], classOf[StructType], classOf[Array[Int]])
.build[OrcDeserializer]()
.newInstance(
orcSchema,
colId)
} else {
DynConstructors.builder()
.impl(
classOf[OrcDeserializer],
classOf[StructType],
classOf[StructType],
classOf[Array[Int]])
.build[OrcDeserializer]()
.newInstance(
new StructType,
orcSchema,
colId)
}
} catch {
case e: Throwable =>
throw new KyuubiException("Failed to create OrcDeserializer", e)
}
}
}

class OrcFileIterator(fileList: ListBuffer[LocatedFileStatus]) extends Iterator[OrcStruct] {

private val iters = fileList.map(x => getOrcFileIterator(x))

var idx = 0

override def hasNext: Boolean = {
val hasNext = iters(idx).hasNext
if (!hasNext) {
iters(idx).close()
cxzl25 marked this conversation as resolved.
Show resolved Hide resolved
idx += 1
// skip empty file
while (idx < iters.size) {
if (iters(idx).hasNext) {
return true
} else {
iters(idx).close()
idx = idx + 1
}
}
}
hasNext
}

override def next(): OrcStruct = {
iters(idx).next()
}

def close(): Unit = {
iters.foreach(_.close())
}

private def getOrcFileIterator(file: LocatedFileStatus): RecordReaderIterator[OrcStruct] = {
val orcRecordReader = {
val split =
new FileSplit(file.getPath, 0, file.getLen, Array.empty[String])
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext =
new TaskAttemptContextImpl(new Configuration(), attemptId)
val oif = new OrcInputFormat[OrcStruct]
oif.createRecordReader(split, hadoopAttemptContext)
}
new RecordReaderIterator[OrcStruct](orcRecordReader)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.kyuubi.engine.spark.session

import java.util.concurrent.{ScheduledExecutorService, TimeUnit}

import org.apache.hadoop.fs.Path
import org.apache.spark.api.python.KyuubiPythonGatewayServer
import org.apache.spark.sql.SparkSession

Expand All @@ -28,6 +29,7 @@ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel._
import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.engineId
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager
import org.apache.kyuubi.session._
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion
Expand Down Expand Up @@ -184,6 +186,12 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
info("Session stopped due to shared level is Connection.")
stopSession()
}
if (conf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: only cleanup for the operation ExecuteStatement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no simple way to determine whether the session has executed ExecuteStatement

val path = new Path(s"${conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)}/" +
s"$engineId/${sessionHandle.identifier}")
path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, true)
info(s"Delete session result file $path")
}
}

private def stopSession(): Unit = {
Expand Down
Loading
Loading