Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #5377] Spark engine query result save to file #5591

Closed
wants to merge 7 commits into from

Conversation

lsm1
Copy link
Contributor

@lsm1 lsm1 commented Oct 31, 2023

Why are the changes needed?

close #5377

How was this patch tested?

  • Add some test cases that check the changes thoroughly including negative and positive cases if possible

  • Add screenshots for manual tests if appropriate

  • Run test locally before make a pull request

Was this patch authored or co-authored using generative AI tooling?

NO

@codecov-commenter
Copy link

codecov-commenter commented Oct 31, 2023

Codecov Report

Attention: 99 lines in your changes are missing coverage. Please review.

Comparison is base (4463cc8) 61.51% compared to head (9d1a18c) 61.34%.
Report is 3 commits behind head on master.

Files Patch % Lines
...ubi/engine/spark/operation/FetchOrcStatement.scala 0.00% 57 Missing ⚠️
...uubi/engine/spark/operation/ExecuteStatement.scala 24.00% 18 Missing and 1 partial ⚠️
...rg/apache/kyuubi/engine/spark/SparkSQLEngine.scala 18.18% 8 Missing and 1 partial ⚠️
...g/apache/spark/sql/kyuubi/SparkDatasetHelper.scala 0.00% 8 Missing ⚠️
.../engine/spark/session/SparkSQLSessionManager.scala 0.00% 4 Missing and 1 partial ⚠️
...in/scala/org/apache/kyuubi/config/KyuubiConf.scala 93.75% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #5591      +/-   ##
============================================
- Coverage     61.51%   61.34%   -0.18%     
  Complexity       23       23              
============================================
  Files           608      609       +1     
  Lines         36091    36252     +161     
  Branches       4952     4993      +41     
============================================
+ Hits          22201    22237      +36     
- Misses        11506    11616     +110     
- Partials       2384     2399      +15     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

val fileName = s"$savePath/$engineId/$sessionId/$statementId"
val colName = range(0, result.schema.size).map(x => "col" + x)
if (resultMaxRows > 0) {
result.toDF(colName: _*).limit(resultMaxRows).write
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use resultDF instead of result. Also, is toDF(colName: _*) necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the result has duplicate columns, we can not write it to file, so we rename all col name to avoid this case

spark.sql("select 1 as a,2 as a").write("/filepath")

org.apache.spark.sql.AnalysisException: Found duplicate column(s) when inserting into hdfs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lsm1 let's add such information to the comments

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is another known issue as I mentioned in the issue comment

directly call df.write will introduce an extra shuffle for the outermost limit, and hurt performance

I think we should also add this known issue to the comment and create a new ticket to track this issue.

}

override def next(): OrcStruct = {
if (iters(idx).hasNext) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iters(idx).hasNext) has been called in the hasNext method

.getOrElse(session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE))
lazy val threshold =
session.sessionManager.getConf.get(OPERATION_RESULT_SAVE_TO_FILE_THRESHOLD)
if (hasResultSet && sparkSave && shouldSaveResultToHdfs(resultMaxRows, threshold, result)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldSaveResultToHdfs is inferred based on the execution plan and may not be accurate. Should we change it to sparkSave || shouldSaveResultToHdfs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to keep it as-is, we need a configuration to disable this feature globally

@wForget
Copy link
Member

wForget commented Nov 9, 2023

It might be simpler for us to make changes in the executeStatement method, like:

change

result = spark.sql(statement)

to

if (saveResultToPath) {
  spark.sql(statement).write.format(format).save(resultPath)
  result = spark.read.load(resultPath)
} else {
  result = spark.sql(statement)
}

WDYT? cc @pan3793 @cxzl25

@cxzl25
Copy link
Contributor

cxzl25 commented Nov 14, 2023

It might be simpler for us to make changes in the executeStatement method, like:

result = spark.read.load(resultPath)

This may lose the ordering of the query data, e.g. order by xx limit 100

@wForget
Copy link
Member

wForget commented Nov 15, 2023

This may lose the ordering of the query data, e.g. order by xx limit 100

I did a simple test and the results were as expected. (Test Env: Kyuubi 1.8.0 + Spark 3.5.0)

create table wangzhen_test_20231115_t1(id bigint, name string) stored as parquet;
insert into wangzhen_test_20231115_t1 values (1, 'a');
insert into wangzhen_test_20231115_t1 values (2, 'b');
insert into wangzhen_test_20231115_t1 values (3, 'c');

set kyuubi.operation.language=scala;

val df = spark.sql("select * from wangzhen_test_20231115_t1 order by id limit 2");
df.write.format("parquet").save("hfds://XXX/result.parquet");

spark.sql("set kyuubi.operation.language=sql");
select * from `parquet`.`hfds://XXX/result.parquet`;
== Physical Plan ==
Execute InsertIntoHadoopFsRelationCommand (5)
+- WriteFiles (4)
   +- TakeOrderedAndProject (3)
      +- * ColumnarToRow (2)
         +- Scan parquet spark_catalog.XXX.wangzhen_test_20231115_t1 (1)

image

image

@lsm1
Copy link
Contributor Author

lsm1 commented Nov 15, 2023

if (saveResultToPath) {
  spark.sql(statement).write.format(format).save(resultPath)
  result = spark.read.load(resultPath)
} else {
  result = spark.sql(statement)
}

we still call result.collect() later, if the result is too large,we can not avoid driver OOM

@wForget
Copy link
Member

wForget commented Nov 15, 2023

we still call result.collect() later, if the result is too large,we can not avoid driver OOM

Can we combine incremental collection mode?

@cxzl25
Copy link
Contributor

cxzl25 commented Nov 15, 2023

I did a simple test and the results were as expected

Maybe you can test the scenario of generating multiple files

@wForget
Copy link
Member

wForget commented Nov 15, 2023

Maybe you can test the scenario of generating multiple files

Do you mean a large data set or multiple tasks?

@wForget
Copy link
Member

wForget commented Nov 15, 2023

Maybe you can test the scenario of generating multiple files

The output seems to be in order even when outputting multiple files.

set spark.sql.files.maxRecordsPerFile=10000;

set kyuubi.operation.language=scala;

spark.range(0, 1000000, 1, numPartitions = 10)
  .selectExpr("id", "cast(id as string) as name")
  .createOrReplaceTempView("wangzhen_test_20231115_tmp1")

val df = spark.sql("select * from wangzhen_test_20231115_tmp1 order by id limit 100000");
df.write.format("parquet").save("hdfs://XXX/result");

spark.sql("set kyuubi.operation.language=sql");
select * from `parquet`.`hdfs://XXX/result`;

image

@lsm1
Copy link
Contributor Author

lsm1 commented Nov 23, 2023

we still call result.collect() later, if the result is too large,we can not avoid driver OOM

Can we combine incremental collection mode?

When we use incremental collection mode, it may significantly impact performance.

@lsm1 lsm1 force-pushed the branch-kyuubi-5377 branch 2 times, most recently from 59add37 to 9b7ce8d Compare December 1, 2023 08:23
@cxzl25
Copy link
Contributor

cxzl25 commented Dec 4, 2023

The output seems to be in order even when outputting multiple files.

When Spark reads datasource, it will be sorted by file length, so there is no guarantee.

org.apache.spark.sql.execution.datasources.v2.FileScan#partitions

      partition.files.flatMap { file =>
        PartitionedFileUtil.splitFiles(
          file = file,
          isSplitable = isSplitable(file.getPath),
          maxSplitBytes = maxSplitBytes,
          partitionValues = partitionValues
        )
      }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)

https://github.com/apache/spark/blob/4398bb5d37328e2f3594302d98f98803a379a2e9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala#L146-L160

@@ -184,6 +186,12 @@ class SparkSQLSessionManager private (name: String, spark: SparkSession)
info("Session stopped due to shared level is Connection.")
stopSession()
}
if (conf.get(OPERATION_RESULT_SAVE_TO_FILE)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: only cleanup for the operation ExecuteStatement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no simple way to determine whether the session has executed ExecuteStatement

@turboFei
Copy link
Member

turboFei commented Dec 13, 2023

Some question:

I wonder that, If the result is order needed, if we save the result into files and then read from when client fetching result, the result returned to users is not ordered as expected.

@pan3793
Copy link
Member

pan3793 commented Dec 13, 2023

@turboFei from the context, I think the implementation already reserves the global order, @cxzl25 could you please clarify it?

@lsm1
Copy link
Contributor Author

lsm1 commented Dec 13, 2023

Some question:

I wonder that, If the result is order needed, if we save the result into files and then read from when client fetching result, the result returned to users is not ordered as expected.

  1. spark save ordered result to multiple part-X files in the filesystem, in order of the keys
    https://github.com/lsm1/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala#L47
org.apache.spark.rdd.OrderedRDDFunctions#sortByKey
 /**
	* Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
	* `collect` or `save` on the resulting RDD will return or output an ordered list of records
	* (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
	* order of the keys).  
*/
  1. When fetchOrcStatement read file, it will be sorted by file name, so it will return ordered result

@cfmcgrady
Copy link
Contributor

thanks all. merging to master(v1.9.0)

@cfmcgrady cfmcgrady added this to the v1.9.0 milestone Dec 13, 2023
@cfmcgrady cfmcgrady closed this in 4c029f9 Dec 13, 2023
turboFei added a commit that referenced this pull request Dec 21, 2023
# 🔍 Description
## Issue References 🔗

#5591 (comment)

## Describe Your Solution 🔧

Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklists
## 📝 Author Self Checklist

- [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project
- [x] I have performed a self-review
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] New and existing unit tests pass locally with my changes
- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

## 📝 Committer Pre-Merge Checklist

- [ ] Pull request title is okay.
- [ ] No license issues.
- [ ] Milestone correctly set?
- [ ] Test coverage is ok
- [ ] Assignees are selected.
- [ ] Minimum number of approvals
- [ ] No changes are requested

**Be nice. Be informative.**

Closes #5895 from lsm1/branch-kyuubi-5377-followup.

Closes #5377

4219d28 [Fei Wang] nit
31d4fc1 [senmiaoliu] use zlib when SPARK version less than 3.2

Lead-authored-by: senmiaoliu <senmiaoliu@trip.com>
Co-authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: Fei Wang <fwang12@ebay.com>
turboFei added a commit that referenced this pull request Feb 4, 2024
…sult max rows

# 🔍 Description
Followup #5591
Support to get existing limit from more plan and regard the result max rows.
## Issue References 🔗

This pull request fixes #

## Describe Your Solution 🔧

Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #5963 from turboFei/incremental_save.

Closes #5377

223d510 [Fei Wang] use optimized plan
ecefc2a [Fei Wang] use spark plan
57091e5 [Fei Wang] minor
2096144 [Fei Wang] for logical plan
0f734ee [Fei Wang] ut
fdc1155 [Fei Wang] save
f8e405a [Fei Wang] math.min

Authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: Fei Wang <fwang12@ebay.com>
zhaohehuhu pushed a commit to zhaohehuhu/incubator-kyuubi that referenced this pull request Feb 5, 2024
…ard result max rows

# 🔍 Description
Followup apache#5591
Support to get existing limit from more plan and regard the result max rows.
## Issue References 🔗

This pull request fixes #

## Describe Your Solution 🔧

Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes apache#5963 from turboFei/incremental_save.

Closes apache#5377

223d510 [Fei Wang] use optimized plan
ecefc2a [Fei Wang] use spark plan
57091e5 [Fei Wang] minor
2096144 [Fei Wang] for logical plan
0f734ee [Fei Wang] ut
fdc1155 [Fei Wang] save
f8e405a [Fei Wang] math.min

Authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: Fei Wang <fwang12@ebay.com>
zhaohehuhu pushed a commit to zhaohehuhu/incubator-kyuubi that referenced this pull request Mar 21, 2024
### _Why are the changes needed?_

close apache#5377

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

NO

Closes apache#5591 from lsm1/branch-kyuubi-5377.

Closes apache#5377

9d1a18c [senmiaoliu] ignore empty file
3c70a1e [LSM] fix doc
73d3c3a [senmiaoliu] fix style and add some comment
80e1f0d [senmiaoliu] Close orc fetchOrcStatement and remove result save file when ExecuteStatement close
42634a1 [senmiaoliu] fix style
979125d [senmiaoliu] fix style
1dc07a5 [senmiaoliu] spark engine save into hdfs file

Lead-authored-by: senmiaoliu <senmiaoliu@trip.com>
Co-authored-by: LSM <senmiaoliu@trip.com>
Signed-off-by: Fu Chen <cfmcgrady@gmail.com>
zhaohehuhu pushed a commit to zhaohehuhu/incubator-kyuubi that referenced this pull request Mar 21, 2024
# 🔍 Description
## Issue References 🔗

apache#5591 (comment)

## Describe Your Solution 🔧

Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklists
## 📝 Author Self Checklist

- [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project
- [x] I have performed a self-review
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] New and existing unit tests pass locally with my changes
- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

## 📝 Committer Pre-Merge Checklist

- [ ] Pull request title is okay.
- [ ] No license issues.
- [ ] Milestone correctly set?
- [ ] Test coverage is ok
- [ ] Assignees are selected.
- [ ] Minimum number of approvals
- [ ] No changes are requested

**Be nice. Be informative.**

Closes apache#5895 from lsm1/branch-kyuubi-5377-followup.

Closes apache#5377

4219d28 [Fei Wang] nit
31d4fc1 [senmiaoliu] use zlib when SPARK version less than 3.2

Lead-authored-by: senmiaoliu <senmiaoliu@trip.com>
Co-authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: Fei Wang <fwang12@ebay.com>
zhaohehuhu pushed a commit to zhaohehuhu/incubator-kyuubi that referenced this pull request Mar 21, 2024
…ard result max rows

# 🔍 Description
Followup apache#5591
Support to get existing limit from more plan and regard the result max rows.
## Issue References 🔗

This pull request fixes #

## Describe Your Solution 🔧

Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes apache#5963 from turboFei/incremental_save.

Closes apache#5377

223d510 [Fei Wang] use optimized plan
ecefc2a [Fei Wang] use spark plan
57091e5 [Fei Wang] minor
2096144 [Fei Wang] for logical plan
0f734ee [Fei Wang] ut
fdc1155 [Fei Wang] save
f8e405a [Fei Wang] math.min

Authored-by: Fei Wang <fwang12@ebay.com>
Signed-off-by: Fei Wang <fwang12@ebay.com>
pan3793 pushed a commit that referenced this pull request Jun 4, 2024
# 🔍 Description
## Issue References 🔗

This pull request fixes #6437

## Describe Your Solution 🔧

Use `org.apache.hadoop.fs.Path` instead of `java.nio.file.Paths` to avoid `OPERATION_RESULT_SAVE_TO_FILE_DIR` scheme unexpected change.

## Types of changes 🔖

- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

Spark Job failed to start with error: `java.io.IOException: JuiceFS initialized failed for jfs:///` with conf `kyuubi.operation.result.saveToFile.dir=jfs://datalake/tmp`.

`hdfs://xxx:port/tmp` may encounter similar errors

#### Behavior With This Pull Request 🎉

User Can use hdfs dir as `kyuubi.operation.result.saveToFile.dir` without error.

#### Related Unit Tests

Seems no test suites added in #5591 and #5986, I'll try to build a dist and test with our internal cluster.

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6444 from camper42/save-to-hdfs.

Closes #6437

990f0a7 [camper42] [Kyuubi #6437] Fix Spark engine query result save to HDFS

Authored-by: camper42 <camper.xlii@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
pan3793 pushed a commit that referenced this pull request Jun 4, 2024
This pull request fixes #6437

Use `org.apache.hadoop.fs.Path` instead of `java.nio.file.Paths` to avoid `OPERATION_RESULT_SAVE_TO_FILE_DIR` scheme unexpected change.

- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

Spark Job failed to start with error: `java.io.IOException: JuiceFS initialized failed for jfs:///` with conf `kyuubi.operation.result.saveToFile.dir=jfs://datalake/tmp`.

`hdfs://xxx:port/tmp` may encounter similar errors

User Can use hdfs dir as `kyuubi.operation.result.saveToFile.dir` without error.

Seems no test suites added in #5591 and #5986, I'll try to build a dist and test with our internal cluster.

---

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6444 from camper42/save-to-hdfs.

Closes #6437

990f0a7 [camper42] [Kyuubi #6437] Fix Spark engine query result save to HDFS

Authored-by: camper42 <camper.xlii@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
(cherry picked from commit 71649da)
Signed-off-by: Cheng Pan <chengpan@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[TASK][MEDIUM] Spark engine query results support reading from HDFS
7 participants