-
Notifications
You must be signed in to change notification settings - Fork 948
[KYUUBI #5377] Spark engine query result save to file #5591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1dc07a5
979125d
42634a1
80e1f0d
73d3c3a
3c70a1e
9d1a18c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,14 +19,16 @@ package org.apache.kyuubi.engine.spark.operation | |
|
||
import java.util.concurrent.RejectedExecutionException | ||
|
||
import scala.Array._ | ||
import scala.collection.JavaConverters._ | ||
|
||
import org.apache.hadoop.fs.Path | ||
import org.apache.spark.sql.DataFrame | ||
import org.apache.spark.sql.kyuubi.SparkDatasetHelper._ | ||
import org.apache.spark.sql.types._ | ||
|
||
import org.apache.kyuubi.{KyuubiSQLException, Logging} | ||
import org.apache.kyuubi.config.KyuubiConf.OPERATION_RESULT_MAX_ROWS | ||
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_DIR, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE} | ||
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._ | ||
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl | ||
import org.apache.kyuubi.operation.{ArrayFetchIterator, FetchIterator, IterableFetchIterator, OperationHandle, OperationState} | ||
|
@@ -46,6 +48,8 @@ class ExecuteStatement( | |
override def getOperationLog: Option[OperationLog] = Option(operationLog) | ||
override protected def supportProgress: Boolean = true | ||
|
||
private var fetchOrcStatement: Option[FetchOrcStatement] = None | ||
private var saveFileName: Option[String] = None | ||
override protected def resultSchema: StructType = { | ||
if (result == null || result.schema.isEmpty) { | ||
new StructType().add("Result", "string") | ||
|
@@ -64,6 +68,15 @@ class ExecuteStatement( | |
OperationLog.removeCurrentOperationLog() | ||
} | ||
|
||
override def close(): Unit = { | ||
super.close() | ||
fetchOrcStatement.foreach(_.close()) | ||
saveFileName.foreach { p => | ||
val path = new Path(p) | ||
path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, true) | ||
} | ||
} | ||
|
||
protected def incrementalCollectResult(resultDF: DataFrame): Iterator[Any] = { | ||
resultDF.toLocalIterator().asScala | ||
} | ||
|
@@ -158,6 +171,29 @@ class ExecuteStatement( | |
override def iterator: Iterator[Any] = incrementalCollectResult(resultDF) | ||
}) | ||
} else { | ||
val resultSaveEnabled = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE, spark) | ||
lazy val resultSaveThreshold = getSessionConf(OPERATION_RESULT_SAVE_TO_FILE_MINSIZE, spark) | ||
if (hasResultSet && resultSaveEnabled && shouldSaveResultToFs( | ||
resultMaxRows, | ||
resultSaveThreshold, | ||
result)) { | ||
val sessionId = session.handle.identifier.toString | ||
val savePath = session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR) | ||
saveFileName = Some(s"$savePath/$engineId/$sessionId/$statementId") | ||
// Rename all col name to avoid duplicate columns | ||
val colName = range(0, result.schema.size).map(x => "col" + x) | ||
// df.write will introduce an extra shuffle for the outermost limit, and hurt performance | ||
if (resultMaxRows > 0) { | ||
result.toDF(colName: _*).limit(resultMaxRows).write | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the result has duplicate columns, we can not write it to file, so we rename all col name to avoid this case
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @lsm1 let's add such information to the comments There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there is another known issue as I mentioned in the issue comment
I think we should also add this known issue to the comment and create a new ticket to track this issue. |
||
.option("compression", "zstd").format("orc").save(saveFileName.get) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIRC, the zstd implementation of ORC is powered by aircompressor, I'm not sure if it's powerful as zstd-jni does. Anyway, it's internal implementation detail, we can change it latter if we have better solution There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we also make the written format configurable? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer to keep it internally, it may be out of control if we expose the internal implementation early There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [SPARK-33978][SQL] Support ZSTD compression in ORC data source Maybe we need a configuration item, or the Spark version less than 3.2.0 is compressed with zlib. 3.1.1 bin/spark-shell scala> spark.range(10).write.option("compression", "zstd").orc("/tmp/zstd")
java.lang.IllegalArgumentException: Codec [zstd] is not available. Available codecs are uncompressed, lzo, snappy, zlib, none. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe it's time to have a discussion for dropping support of Spark 3.1 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 😅we are still using spark 3.1, It need time to migrate to new spark version |
||
} else { | ||
result.toDF(colName: _*).write | ||
.option("compression", "zstd").format("orc").save(saveFileName.get) | ||
} | ||
info(s"Save result to $saveFileName") | ||
fetchOrcStatement = Some(new FetchOrcStatement(spark)) | ||
return fetchOrcStatement.get.getIterator(saveFileName.get, resultSchema) | ||
} | ||
val internalArray = if (resultMaxRows <= 0) { | ||
info("Execute in full collect mode") | ||
fullCollectResult(resultDF) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.kyuubi.engine.spark.operation | ||
|
||
import scala.Array._ | ||
import scala.collection.mutable.ListBuffer | ||
|
||
import org.apache.hadoop.conf.Configuration | ||
import org.apache.hadoop.fs.{LocatedFileStatus, Path} | ||
import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} | ||
import org.apache.hadoop.mapreduce.lib.input.FileSplit | ||
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl | ||
import org.apache.orc.mapred.OrcStruct | ||
import org.apache.orc.mapreduce.OrcInputFormat | ||
import org.apache.spark.sql.{Row, SparkSession} | ||
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} | ||
import org.apache.spark.sql.catalyst.expressions.AttributeReference | ||
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection | ||
import org.apache.spark.sql.execution.datasources.RecordReaderIterator | ||
import org.apache.spark.sql.execution.datasources.orc.OrcDeserializer | ||
import org.apache.spark.sql.types.StructType | ||
|
||
import org.apache.kyuubi.KyuubiException | ||
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION | ||
import org.apache.kyuubi.operation.{FetchIterator, IterableFetchIterator} | ||
import org.apache.kyuubi.util.reflect.DynConstructors | ||
|
||
class FetchOrcStatement(spark: SparkSession) { | ||
|
||
var orcIter: OrcFileIterator = _ | ||
def getIterator(path: String, orcSchema: StructType): FetchIterator[Row] = { | ||
val conf = spark.sparkContext.hadoopConfiguration | ||
val savePath = new Path(path) | ||
val fsIterator = savePath.getFileSystem(conf).listFiles(savePath, false) | ||
val list = new ListBuffer[LocatedFileStatus] | ||
while (fsIterator.hasNext) { | ||
val file = fsIterator.next() | ||
if (file.getPath.getName.endsWith(".orc") && file.getLen > 0) { | ||
list += file | ||
} | ||
} | ||
val toRowConverter: InternalRow => Row = { | ||
CatalystTypeConverters.createToScalaConverter(orcSchema) | ||
.asInstanceOf[InternalRow => Row] | ||
} | ||
val colId = range(0, orcSchema.size) | ||
val fullSchema = orcSchema.map(f => | ||
AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()) | ||
val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) | ||
val deserializer = getOrcDeserializer(orcSchema, colId) | ||
orcIter = new OrcFileIterator(list) | ||
val iterRow = orcIter.map(value => | ||
unsafeProjection(deserializer.deserialize(value))) | ||
.map(value => toRowConverter(value)) | ||
new IterableFetchIterator[Row](iterRow.toIterable) | ||
} | ||
|
||
def close(): Unit = { | ||
orcIter.close() | ||
} | ||
|
||
private def getOrcDeserializer(orcSchema: StructType, colId: Array[Int]): OrcDeserializer = { | ||
try { | ||
if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") { | ||
// SPARK-34535 changed the constructor signature of OrcDeserializer | ||
DynConstructors.builder() | ||
.impl(classOf[OrcDeserializer], classOf[StructType], classOf[Array[Int]]) | ||
.build[OrcDeserializer]() | ||
.newInstance( | ||
orcSchema, | ||
colId) | ||
} else { | ||
DynConstructors.builder() | ||
.impl( | ||
classOf[OrcDeserializer], | ||
classOf[StructType], | ||
classOf[StructType], | ||
classOf[Array[Int]]) | ||
.build[OrcDeserializer]() | ||
.newInstance( | ||
new StructType, | ||
orcSchema, | ||
colId) | ||
} | ||
} catch { | ||
case e: Throwable => | ||
throw new KyuubiException("Failed to create OrcDeserializer", e) | ||
} | ||
} | ||
} | ||
|
||
class OrcFileIterator(fileList: ListBuffer[LocatedFileStatus]) extends Iterator[OrcStruct] { | ||
|
||
private val iters = fileList.map(x => getOrcFileIterator(x)) | ||
|
||
var idx = 0 | ||
|
||
override def hasNext: Boolean = { | ||
val hasNext = iters(idx).hasNext | ||
if (!hasNext) { | ||
iters(idx).close() | ||
cxzl25 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
idx += 1 | ||
// skip empty file | ||
while (idx < iters.size) { | ||
if (iters(idx).hasNext) { | ||
return true | ||
} else { | ||
iters(idx).close() | ||
idx = idx + 1 | ||
} | ||
} | ||
} | ||
hasNext | ||
} | ||
|
||
override def next(): OrcStruct = { | ||
iters(idx).next() | ||
} | ||
|
||
def close(): Unit = { | ||
iters.foreach(_.close()) | ||
} | ||
|
||
private def getOrcFileIterator(file: LocatedFileStatus): RecordReaderIterator[OrcStruct] = { | ||
val orcRecordReader = { | ||
val split = | ||
new FileSplit(file.getPath, 0, file.getLen, Array.empty[String]) | ||
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) | ||
val hadoopAttemptContext = | ||
new TaskAttemptContextImpl(new Configuration(), attemptId) | ||
val oif = new OrcInputFormat[OrcStruct] | ||
oif.createRecordReader(split, hadoopAttemptContext) | ||
} | ||
new RecordReaderIterator[OrcStruct](orcRecordReader) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ package org.apache.kyuubi.engine.spark.session | |
|
||
import java.util.concurrent.{ScheduledExecutorService, TimeUnit} | ||
|
||
import org.apache.hadoop.fs.Path | ||
import org.apache.spark.api.python.KyuubiPythonGatewayServer | ||
import org.apache.spark.sql.SparkSession | ||
|
||
|
@@ -28,6 +29,7 @@ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY | |
import org.apache.kyuubi.engine.ShareLevel | ||
import org.apache.kyuubi.engine.ShareLevel._ | ||
import org.apache.kyuubi.engine.spark.{KyuubiSparkUtil, SparkSQLEngine} | ||
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.engineId | ||
import org.apache.kyuubi.engine.spark.operation.SparkSQLOperationManager | ||
import org.apache.kyuubi.session._ | ||
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion | ||
|
@@ -184,6 +186,12 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession) | |
info("Session stopped due to shared level is Connection.") | ||
stopSession() | ||
} | ||
if (conf.get(OPERATION_RESULT_SAVE_TO_FILE)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: only cleanup for the operation There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no simple way to determine whether the session has executed |
||
val path = new Path(s"${conf.get(OPERATION_RESULT_SAVE_TO_FILE_DIR)}/" + | ||
s"$engineId/${sessionHandle.identifier}") | ||
path.getFileSystem(spark.sparkContext.hadoopConfiguration).delete(path, true) | ||
info(s"Delete session result file $path") | ||
} | ||
} | ||
|
||
private def stopSession(): Unit = { | ||
|
Uh oh!
There was an error while loading. Please reload this page.