-
Notifications
You must be signed in to change notification settings - Fork 913
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KYUUBI #5377] Spark engine query result save to file #5591
Conversation
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #5591 +/- ##
============================================
- Coverage 61.51% 61.34% -0.18%
Complexity 23 23
============================================
Files 608 609 +1
Lines 36091 36252 +161
Branches 4952 4993 +41
============================================
+ Hits 22201 22237 +36
- Misses 11506 11616 +110
- Partials 2384 2399 +15 ☔ View full report in Codecov by Sentry. |
...s/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
Show resolved
Hide resolved
val fileName = s"$savePath/$engineId/$sessionId/$statementId" | ||
val colName = range(0, result.schema.size).map(x => "col" + x) | ||
if (resultMaxRows > 0) { | ||
result.toDF(colName: _*).limit(resultMaxRows).write |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use resultDF
instead of result
. Also, is toDF(colName: _*)
necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the result has duplicate columns, we can not write it to file, so we rename all col name to avoid this case
spark.sql("select 1 as a,2 as a").write("/filepath")
org.apache.spark.sql.AnalysisException: Found duplicate column(s) when inserting into hdfs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lsm1 let's add such information to the comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is another known issue as I mentioned in the issue comment
directly call
df.write
will introduce an extra shuffle for the outermost limit, and hurt performance
I think we should also add this known issue to the comment and create a new ticket to track this issue.
...k-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala
Outdated
Show resolved
Hide resolved
} | ||
|
||
override def next(): OrcStruct = { | ||
if (iters(idx).hasNext) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iters(idx).hasNext)
has been called in the hasNext
method
.getOrElse(session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE)) | ||
lazy val threshold = | ||
session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_THRESHOLD) | ||
if (hasResultSet && sparkSave && shouldSaveResultToHdfs(resultMaxRows, threshold, result)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldSaveResultToHdfs
is inferred based on the execution plan and may not be accurate. Should we change it to sparkSave || shouldSaveResultToHdfs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to keep it as-is, we need a configuration to disable this feature globally
It might be simpler for us to make changes in the change
to
|
This may lose the ordering of the query data, e.g. |
I did a simple test and the results were as expected. (Test Env: Kyuubi 1.8.0 + Spark 3.5.0)
|
we still call |
Can we combine incremental collection mode? |
Maybe you can test the scenario of generating multiple files |
Do you mean a large data set or multiple tasks? |
.../kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
Outdated
Show resolved
Hide resolved
The output seems to be in order even when outputting multiple files.
|
When we use incremental collection mode, it may significantly impact performance. |
f8574f9
to
09d3b98
Compare
.../kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
Outdated
Show resolved
Hide resolved
...k-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala
Outdated
Show resolved
Hide resolved
...k-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala
Show resolved
Hide resolved
59add37
to
9b7ce8d
Compare
When Spark reads datasource, it will be sorted by file length, so there is no guarantee. org.apache.spark.sql.execution.datasources.v2.FileScan#partitions partition.files.flatMap { file =>
PartitionedFileUtil.splitFiles(
file = file,
isSplitable = isSplitable(file.getPath),
maxSplitBytes = maxSplitBytes,
partitionValues = partitionValues
)
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse) |
kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
Outdated
Show resolved
Hide resolved
kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
Outdated
Show resolved
Hide resolved
8af15f8
to
73d3c3a
Compare
...k-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala
Outdated
Show resolved
Hide resolved
@@ -184,6 +186,12 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession) | |||
info("Session stopped due to shared level is Connection.") | |||
stopSession() | |||
} | |||
if (conf.get(OPERATION_RESULT_SAVE_TO_FILE)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: only cleanup for the operation ExecuteStatement
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no simple way to determine whether the session has executed ExecuteStatement
806e724
to
9d1a18c
Compare
Some question: I wonder that, If the result is order needed, if we save the result into files and then read from when client fetching result, the result returned to users is not ordered as expected. |
|
thanks all. merging to master(v1.9.0) |
# 🔍 Description ## Issue References 🔗 #5591 (comment) ## Describe Your Solution 🔧 Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change. ## Types of changes 🔖 - [ ] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 #### Behavior Without This Pull Request ⚰️ #### Behavior With This Pull Request 🎉 #### Related Unit Tests --- # Checklists ## 📝 Author Self Checklist - [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project - [x] I have performed a self-review - [x] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [x] My changes generate no new warnings - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] New and existing unit tests pass locally with my changes - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) ## 📝 Committer Pre-Merge Checklist - [ ] Pull request title is okay. - [ ] No license issues. - [ ] Milestone correctly set? - [ ] Test coverage is ok - [ ] Assignees are selected. - [ ] Minimum number of approvals - [ ] No changes are requested **Be nice. Be informative.** Closes #5895 from lsm1/branch-kyuubi-5377-followup. Closes #5377 4219d28 [Fei Wang] nit 31d4fc1 [senmiaoliu] use zlib when SPARK version less than 3.2 Lead-authored-by: senmiaoliu <senmiaoliu@trip.com> Co-authored-by: Fei Wang <fwang12@ebay.com> Signed-off-by: Fei Wang <fwang12@ebay.com>
…sult max rows # 🔍 Description Followup #5591 Support to get existing limit from more plan and regard the result max rows. ## Issue References 🔗 This pull request fixes # ## Describe Your Solution 🔧 Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change. ## Types of changes 🔖 - [ ] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 #### Behavior Without This Pull Request ⚰️ #### Behavior With This Pull Request 🎉 #### Related Unit Tests --- # Checklist 📝 - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #5963 from turboFei/incremental_save. Closes #5377 223d510 [Fei Wang] use optimized plan ecefc2a [Fei Wang] use spark plan 57091e5 [Fei Wang] minor 2096144 [Fei Wang] for logical plan 0f734ee [Fei Wang] ut fdc1155 [Fei Wang] save f8e405a [Fei Wang] math.min Authored-by: Fei Wang <fwang12@ebay.com> Signed-off-by: Fei Wang <fwang12@ebay.com>
…ard result max rows # 🔍 Description Followup apache#5591 Support to get existing limit from more plan and regard the result max rows. ## Issue References 🔗 This pull request fixes # ## Describe Your Solution 🔧 Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change. ## Types of changes 🔖 - [ ] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 #### Behavior Without This Pull Request ⚰️ #### Behavior With This Pull Request 🎉 #### Related Unit Tests --- # Checklist 📝 - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes apache#5963 from turboFei/incremental_save. Closes apache#5377 223d510 [Fei Wang] use optimized plan ecefc2a [Fei Wang] use spark plan 57091e5 [Fei Wang] minor 2096144 [Fei Wang] for logical plan 0f734ee [Fei Wang] ut fdc1155 [Fei Wang] save f8e405a [Fei Wang] math.min Authored-by: Fei Wang <fwang12@ebay.com> Signed-off-by: Fei Wang <fwang12@ebay.com>
### _Why are the changes needed?_ close apache#5377 ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request ### _Was this patch authored or co-authored using generative AI tooling?_ NO Closes apache#5591 from lsm1/branch-kyuubi-5377. Closes apache#5377 9d1a18c [senmiaoliu] ignore empty file 3c70a1e [LSM] fix doc 73d3c3a [senmiaoliu] fix style and add some comment 80e1f0d [senmiaoliu] Close orc fetchOrcStatement and remove result save file when ExecuteStatement close 42634a1 [senmiaoliu] fix style 979125d [senmiaoliu] fix style 1dc07a5 [senmiaoliu] spark engine save into hdfs file Lead-authored-by: senmiaoliu <senmiaoliu@trip.com> Co-authored-by: LSM <senmiaoliu@trip.com> Signed-off-by: Fu Chen <cfmcgrady@gmail.com>
# 🔍 Description ## Issue References 🔗 apache#5591 (comment) ## Describe Your Solution 🔧 Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change. ## Types of changes 🔖 - [ ] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 #### Behavior Without This Pull Request ⚰️ #### Behavior With This Pull Request 🎉 #### Related Unit Tests --- # Checklists ## 📝 Author Self Checklist - [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project - [x] I have performed a self-review - [x] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [x] My changes generate no new warnings - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] New and existing unit tests pass locally with my changes - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) ## 📝 Committer Pre-Merge Checklist - [ ] Pull request title is okay. - [ ] No license issues. - [ ] Milestone correctly set? - [ ] Test coverage is ok - [ ] Assignees are selected. - [ ] Minimum number of approvals - [ ] No changes are requested **Be nice. Be informative.** Closes apache#5895 from lsm1/branch-kyuubi-5377-followup. Closes apache#5377 4219d28 [Fei Wang] nit 31d4fc1 [senmiaoliu] use zlib when SPARK version less than 3.2 Lead-authored-by: senmiaoliu <senmiaoliu@trip.com> Co-authored-by: Fei Wang <fwang12@ebay.com> Signed-off-by: Fei Wang <fwang12@ebay.com>
…ard result max rows # 🔍 Description Followup apache#5591 Support to get existing limit from more plan and regard the result max rows. ## Issue References 🔗 This pull request fixes # ## Describe Your Solution 🔧 Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change. ## Types of changes 🔖 - [ ] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 #### Behavior Without This Pull Request ⚰️ #### Behavior With This Pull Request 🎉 #### Related Unit Tests --- # Checklist 📝 - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes apache#5963 from turboFei/incremental_save. Closes apache#5377 223d510 [Fei Wang] use optimized plan ecefc2a [Fei Wang] use spark plan 57091e5 [Fei Wang] minor 2096144 [Fei Wang] for logical plan 0f734ee [Fei Wang] ut fdc1155 [Fei Wang] save f8e405a [Fei Wang] math.min Authored-by: Fei Wang <fwang12@ebay.com> Signed-off-by: Fei Wang <fwang12@ebay.com>
# 🔍 Description ## Issue References 🔗 This pull request fixes #6437 ## Describe Your Solution 🔧 Use `org.apache.hadoop.fs.Path` instead of `java.nio.file.Paths` to avoid `OPERATION_RESULT_SAVE_TO_FILE_DIR` scheme unexpected change. ## Types of changes 🔖 - [x] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 #### Behavior Without This Pull Request ⚰️ Spark Job failed to start with error: `java.io.IOException: JuiceFS initialized failed for jfs:///` with conf `kyuubi.operation.result.saveToFile.dir=jfs://datalake/tmp`. `hdfs://xxx:port/tmp` may encounter similar errors #### Behavior With This Pull Request 🎉 User Can use hdfs dir as `kyuubi.operation.result.saveToFile.dir` without error. #### Related Unit Tests Seems no test suites added in #5591 and #5986, I'll try to build a dist and test with our internal cluster. --- # Checklist 📝 - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6444 from camper42/save-to-hdfs. Closes #6437 990f0a7 [camper42] [Kyuubi #6437] Fix Spark engine query result save to HDFS Authored-by: camper42 <camper.xlii@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
This pull request fixes #6437 Use `org.apache.hadoop.fs.Path` instead of `java.nio.file.Paths` to avoid `OPERATION_RESULT_SAVE_TO_FILE_DIR` scheme unexpected change. - [x] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) Spark Job failed to start with error: `java.io.IOException: JuiceFS initialized failed for jfs:///` with conf `kyuubi.operation.result.saveToFile.dir=jfs://datalake/tmp`. `hdfs://xxx:port/tmp` may encounter similar errors User Can use hdfs dir as `kyuubi.operation.result.saveToFile.dir` without error. Seems no test suites added in #5591 and #5986, I'll try to build a dist and test with our internal cluster. --- - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6444 from camper42/save-to-hdfs. Closes #6437 990f0a7 [camper42] [Kyuubi #6437] Fix Spark engine query result save to HDFS Authored-by: camper42 <camper.xlii@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org> (cherry picked from commit 71649da) Signed-off-by: Cheng Pan <chengpan@apache.org>
Why are the changes needed?
close #5377
How was this patch tested?
Add some test cases that check the changes thoroughly including negative and positive cases if possible
Add screenshots for manual tests if appropriate
Run test locally before make a pull request
Was this patch authored or co-authored using generative AI tooling?
NO