-
Notifications
You must be signed in to change notification settings - Fork 28.5k
SPARK-23325: Use InternalRow when reading with DataSourceV2. #21118
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-23325: Use InternalRow when reading with DataSourceV2. #21118
Conversation
Test build #89661 has finished for PR 21118 at commit
|
eddd049
to
72f3c1a
Compare
@jose-torres, @cloud-fan, can you take a look at this? It updates the v2 API to use InternalRow by default in the read path. I'll follow up with a patch for the write path, or we can include it here if you'd prefer. |
Test build #89662 has finished for PR 21118 at commit
|
Test build #89663 has finished for PR 21118 at commit
|
Generally looks good. IIRC, there's some arcane reason why plan nodes need to produce UnsafeRow even though SparkPlan.execute() declares InternalRow. So we may need to add a projection in DataSourceV2ScanExec. |
Yeah, we should probably add a projection. It's probably only working because the InternalRows that are produced are all UnsafeRow. |
Test build #89665 has finished for PR 21118 at commit
|
@rdblue . Could you fix the remaining [error] /home/jenkins/workspace/SparkPullRequestBuilder@2/external/kafka-0-10-sql/src/test/scala/
org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:676:
value createUnsafeRowReaderFactories is not a member of org.apache.spark.sql.kafka010.KafkaMicroBatchReader
[error] val factories = reader.createUnsafeRowReaderFactories().asScala
[error] |
* changed in the future Spark versions. | ||
*/ | ||
@InterfaceStability.Unstable | ||
public interface SupportsScanUnsafeRow extends DataSourceReader { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we still need this trait.
In Spark SQL there is a contract that all operators produce UnsafeRow
, except some Dataset related operators. We have operators assuming the input rows are UnsafeRow
and do type cast, e.g. operators which use GenerateUnsafeRowJoiner
.
That is to say, the data source scan has to do an unsafe projection to make sure it produces unsafe rows. This will be a waste if the data source already produces unsafe rows.
For file-based data source, we solve this issue by adding a flag needsUnsafeRowConversion
to decide if we need the unsafe projection or not. Another solution is what @rdblue proposed in the dev list discussion: do isInstanOf[UnsafeRow]
check for each input row and skip unsafe projection if it's already unsafe row. That may have a performance penalty because of the per-row check.
So this trait is still useful, at least for the built-in file-based data sources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check I was referring to is implemented in generated code. The projection added in DataSourceV2Strategy
handles the cases where part or all of the incoming row is UnsafeRow
.
@@ -86,7 +87,7 @@ class KafkaContinuousReader( | |||
KafkaSourceOffset(JsonUtils.partitionOffsets(json)) | |||
} | |||
|
|||
override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = { | |||
override def createReadTasks(): ju.List[ReadTask[InternalRow]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please do the renaming in an individual PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've moved this to #21145. I'll rebase this PR on that one, so lets try to get that in first.
6006123
to
16f1b6e
Compare
Test build #89863 has finished for PR 21118 at commit
|
@cloud-fan and @jose-torres: I looked at
I've looked at a few simple queries with filters, projects, and aggregation and it doesn't look like any of the generated code depends on If there is no requirement for the rows to be If the rows passed from the data source are If the rows passed from the data source are In all of the cases that I've looked at, a copy to unsafe would only slow down execution. Unsafe rows may have better cache locality, but the copy reads all of the data anyway. If I'm right and we do not need to insert a copy to |
parquet scan doesn't need unsafe row because it outputs I believe if you add aggregate, you will see |
@cloud-fan, actually I tried a lot of different queries yesterday, including joins and aggregations with vectorized reads turned off. The only thing that didn't work was I'm planning to do more testing, but I don't see anything that requires |
all the places that use
Note that we don't enforce this at the API level, i.e. That said, you may not be able to see |
@cloud-fan, let me clarify what I'm getting at here. It appears that Spark makes at least one copy of data to unsafe when reading any Parquet row. If the projection includes partition columns, then Spark makes a second copy to unsafe. Two copies of every row read from Parquet is a fairly common occurrence, even if the plan doesn't need the data to be unsafe. Most of the operators I've been looking at -- including codegen operators -- support What we need to find out is:
This is something we should look into now because it has the potential to be a good speed improvement for queries that use nested data (and can't use the vectorized read path). In addition, it will only get harder to remove needless dependence on |
I just did a performance test based on our 2.1.1 and a real table. I tested a full scan of an hour of data with a single data filter. The scan had 13,083 tasks and read 1084.8 GB. I used identical Spark applications with 100 executors, each with 1 core and 6 GB memory.
Clearly, this is not a benchmark. But this shows a 6% performance improvement for not making unnecessary copies. Eliminating copies is a pretty easy way to get better performance, if we can update a few operators to work with both |
@rdblue , this is a good point. Since not all the operators need unsasfe row, we can save the copy at data source side if we don't need to produce unsade row. Actually we had such a mechanism before: #10511 But I'm not sure if it worth to bring it back. We expect data source to produce |
I don't think it is a good idea to introduce additions for file sources. Part of the motivation for the v2 API is to get rid of those. Besides, I don't think we need it if we handle conversion in Spark instead of in the data sources. I think we should update the physical plan and push both filters and projections into the v2 scan node. Then data sources won't need to produce |
I disagree. The vectorized path isn't used for all Parquet table scans and we should continue to care about its performance. |
@cloud-fan: This PR is also related to #21262 because that PR updates the conversion from logical to physical plan and handles projections and filtering. We could modify that strategy to always ensure that there is a |
This partially brings #10511 back, and we need to plan project and filter with data source scan together to make sure unsafe row is produced at the end. If we want to go this way, I think we should fully bring back #10511 to make this contract explicitly, i.e. which operator produce unsafe row and which operator only accepts unsafe row as input. After this, file sources do not need On the other hand, do you think it's possible that a data source can directly produce unsafe row? e.g. it copies its data to unsafe row directly, without an intermedia |
Whether internal operators should expect The solution I'm suggesting would ensure that sources can produce This would be better than having the data sources handle conversion to unsafe because it avoids extra copies when Spark needs to project anyway and moves the conversion after any filters on top of the scan. |
16f1b6e
to
b5d2c9f
Compare
Test build #90478 has finished for PR 21118 at commit
|
Test build #90480 has finished for PR 21118 at commit
|
Rebased on master to fix conflicts. |
Test build #93365 has finished for PR 21118 at commit
|
so where are we on this? looks like we have 2 LGTM? |
@cloud-fan, any update on merging this? |
retest this please |
LGTM, let's merge it when the tests pass (the last pass was 4 days ago) |
Test build #93500 has finished for PR 21118 at commit
|
Thanks! Merged to master. |
Thanks for reviewing and merging @cloud-fan, @gatorsmile, @felixcheung! |
## What changes were proposed in this pull request? This is a follow up of apache#21118 . In apache#21118 we added `SupportsDeprecatedScanRow`. Ideally data source should produce `InternalRow` instead of `Row` for better performance. We should remove `SupportsDeprecatedScanRow` and encourage data sources to produce `InternalRow`, which is also very easy to build. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21921 from cloud-fan/row.
## What changes were proposed in this pull request? A follow up of #21118 Since we use `InternalRow` in the read API of data source v2, we should do the same thing for the write API. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #21948 from cloud-fan/row-write.
@@ -76,5 +76,5 @@ | |||
* If this method fails (by throwing an exception), the action will fail and no Spark job will be | |||
* submitted. | |||
*/ | |||
List<InputPartition<Row>> planInputPartitions(); | |||
List<InputPartition<InternalRow>> planInputPartitions(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sorry for a question in a old PR like this and I think this might not be directly related with this PR. but please allow me ask a question here. Does this mean developers should produce InternalRow
here for each partition? InternalRow
is under catalyst and not meant to be exposed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rationale is, data source v2 is not stable yet, and we should make it usable first, to make more people implement data sources and provide feedback. Eventually we should design a stable and efficient row builder in data source v2, but for now we should switch to InternalRow
to make it usable. Row
is too slow to implement a decent data source (like iceberg).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, okie. thanks!
This updates the DataSourceV2 API to use InternalRow instead of Row for the default case with no scan mix-ins. Support for readers that produce Row is added through SupportsDeprecatedScanRow, which matches the previous API. Readers that used Row now implement this class and should be migrated to InternalRow. Readers that previously implemented SupportsScanUnsafeRow have been migrated to use no SupportsScan mix-ins and produce InternalRow. This uses existing tests. Author: Ryan Blue <blue@apache.org> Closes apache#21118 from rdblue/SPARK-23325-datasource-v2-internal-row. (cherry picked from commit 9d27541) Conflicts: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
This is a follow up of apache#21118 . In apache#21118 we added `SupportsDeprecatedScanRow`. Ideally data source should produce `InternalRow` instead of `Row` for better performance. We should remove `SupportsDeprecatedScanRow` and encourage data sources to produce `InternalRow`, which is also very easy to build. existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21921 from cloud-fan/row. (cherry picked from commit defc54c) Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
A follow up of apache#21118 Since we use `InternalRow` in the read API of data source v2, we should do the same thing for the write API. existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21948 from cloud-fan/row-write. (cherry picked from commit ac527b5) Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterProvider.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
This updates the DataSourceV2 API to use InternalRow instead of Row for the default case with no scan mix-ins. Support for readers that produce Row is added through SupportsDeprecatedScanRow, which matches the previous API. Readers that used Row now implement this class and should be migrated to InternalRow. Readers that previously implemented SupportsScanUnsafeRow have been migrated to use no SupportsScan mix-ins and produce InternalRow. This uses existing tests. Author: Ryan Blue <blue@apache.org> Closes apache#21118 from rdblue/SPARK-23325-datasource-v2-internal-row.
This is a follow up of apache#21118 . In apache#21118 we added `SupportsDeprecatedScanRow`. Ideally data source should produce `InternalRow` instead of `Row` for better performance. We should remove `SupportsDeprecatedScanRow` and encourage data sources to produce `InternalRow`, which is also very easy to build. existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21921 from cloud-fan/row.
A follow up of apache#21118 Since we use `InternalRow` in the read API of data source v2, we should do the same thing for the write API. existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21948 from cloud-fan/row-write.
This updates the DataSourceV2 API to use InternalRow instead of Row for the default case with no scan mix-ins. Support for readers that produce Row is added through SupportsDeprecatedScanRow, which matches the previous API. Readers that used Row now implement this class and should be migrated to InternalRow. Readers that previously implemented SupportsScanUnsafeRow have been migrated to use no SupportsScan mix-ins and produce InternalRow. This uses existing tests. Author: Ryan Blue <blue@apache.org> Closes apache#21118 from rdblue/SPARK-23325-datasource-v2-internal-row. (cherry picked from commit 9d27541) Conflicts: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousQueuedDataReaderSuite.scala
This is a follow up of apache#21118 . In apache#21118 we added `SupportsDeprecatedScanRow`. Ideally data source should produce `InternalRow` instead of `Row` for better performance. We should remove `SupportsDeprecatedScanRow` and encourage data sources to produce `InternalRow`, which is also very easy to build. existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21921 from cloud-fan/row. (cherry picked from commit defc54c) Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchReader.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/socket.scala sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala
A follow up of apache#21118 Since we use `InternalRow` in the read API of data source v2, we should do the same thing for the write API. existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21948 from cloud-fan/row-write. (cherry picked from commit ac527b5) Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterProvider.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
This updates the DataSourceV2 API to use InternalRow instead of Row for the default case with no scan mix-ins. Support for readers that produce Row is added through SupportsDeprecatedScanRow, which matches the previous API. Readers that used Row now implement this class and should be migrated to InternalRow. Readers that previously implemented SupportsScanUnsafeRow have been migrated to use no SupportsScan mix-ins and produce InternalRow. This uses existing tests. Author: Ryan Blue <blue@apache.org> Closes apache#21118 from rdblue/SPARK-23325-datasource-v2-internal-row. Ref: LIHADOOP-48531 RB=1855575 A=
A follow up of apache#21118 Since we use `InternalRow` in the read API of data source v2, we should do the same thing for the write API. existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21948 from cloud-fan/row-write. Ref: LIHADOOP-48531 RB=1855948 G=superfriends-reviewers R=yezhou,latang,mshen,fli,zolin A=
What changes were proposed in this pull request?
This updates the DataSourceV2 API to use InternalRow instead of Row for the default case with no scan mix-ins.
Support for readers that produce Row is added through SupportsDeprecatedScanRow, which matches the previous API. Readers that used Row now implement this class and should be migrated to InternalRow.
Readers that previously implemented SupportsScanUnsafeRow have been migrated to use no SupportsScan mix-ins and produce InternalRow.
How was this patch tested?
This uses existing tests.