-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-24991][SQL] use InternalRow in DataSourceWriter #21948
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@cloud-fan, thanks! I am a bot who has found some folks who might be able to help with the review:@gatorsmile, @zsxwing and @tdas |
@@ -89,8 +89,7 @@ class RateStreamContinuousReader(options: DataSourceOptions) extends ContinuousR | |||
start.runTimeMs, | |||
i, | |||
numPartitions, | |||
perPartitionRate) | |||
.asInstanceOf[InputPartition[InternalRow]] | |||
perPartitionRate): InputPartition[InternalRow] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed to make it compile, but at least we don't need to do cast.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be in a separate commit. I didn't notice yesterday that this is for the writer until it was linked from the other issue. I think this change needs to get in, but it should not be mixed into changes for the write path.
lgtm |
Test build #93898 has finished for PR 21948 at commit
|
/** | ||
* When true, Spark will reuse the same data object instance when sending data to the data writer, | ||
* for better performance. Data writers should carefully handle the data objects if it's reused, | ||
* e.g. do not buffer the data objects in a list. By default it returns false for safety, data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue did you hit this problem in iceberg?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, Iceberg assumes that data objects are reused.
Test build #93937 has finished for PR 21948 at commit
|
Test build #93938 has finished for PR 21948 at commit
|
retest this please |
Test build #93974 has finished for PR 21948 at commit
|
|
||
// write the data and commit this writer. | ||
Utils.tryWithSafeFinallyAndFailureCallbacks(block = { | ||
while (iter.hasNext) { | ||
dataWriter.write(iter.next()) | ||
// Internally Spark reuse the same UnsafeRow instance when producing output rows, here we |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: reuse => reuses
|
||
/** | ||
* When true, Spark will reuse the same data object instance when sending data to the data writer, | ||
* for better performance. Data writers should carefully handle the data objects if it's reused, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: if it's reused
the it
here is ambiguous. Maybe change to if they are reused
?
/** | ||
* When true, Spark will reuse the same data object instance when sending data to the data writer, | ||
* for better performance. Data writers should carefully handle the data objects if it's reused, | ||
* e.g. do not buffer the data objects in a list. By default it returns false for safety, data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: By default it returns false, data sources
=> By default the method returns false. Data sources
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM otherwise.
* sources can override it if their data writers immediately write the data object to somewhere | ||
* else like a memory buffer or disk. | ||
*/ | ||
default boolean reuseDataObject() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this should be added in this commit. This is to move to InternalRow
and should not alter the API. If we want to add this, then let's discuss it in a PR for this as a feature. I'm fine documenting the default reuse behavior in this commit, though.
I think writers are responsible for defensive copies if necessary. This default is going to cause sources to be slower and I don't think it is necessary for implementations that aren't tests buffering data in memory.
In general, I think it's okay for us to have higher expectations of sources than of users. It's okay to simply document that rows are reused.
I'm changing my +1 to -1 because read-side changes are mixed in and because copies are the responsibility of data sources if they buffer and hold references to earlier rows. At a minimum, changing that expectation should be done in a separate issue and PR. |
@rdblue I have documented the object reuse behavior and ask data source to handle it, please take a look, thanks! |
Test build #94106 has finished for PR 21948 at commit
|
retest this please |
Test build #94201 has finished for PR 21948 at commit
|
retest this please |
@@ -89,7 +89,8 @@ class RateStreamContinuousReader(options: DataSourceOptions) extends ContinuousR | |||
start.runTimeMs, | |||
i, | |||
numPartitions, | |||
perPartitionRate): InputPartition[InternalRow] | |||
perPartitionRate) | |||
.asInstanceOf[InputPartition[InternalRow]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this cast necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to address your comments: do not mix read-side changes. So I reverted it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, sorry about that. I should have looked at the whole diff.
MemoryWriterCommitMessage(0, Seq(InternalRow(1), InternalRow(2))), | ||
MemoryWriterCommitMessage(1, Seq(InternalRow(3), InternalRow(4))), | ||
MemoryWriterCommitMessage(2, Seq(InternalRow(6), InternalRow(7))) | ||
MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this changed back to Row?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now the DataWriter
needs to copy the input row before buffering it, which can be done by the RowEncoder
when converting InternalRow
to Row
. Then the write message carries Row
s to the driver side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use InternalRow.copy? I'd rather keep the update to InternalRow, but as long as the tests pass I wouldn't block this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because the memory sink needs Row
s at the end. Instead of collecting InternalRow
s via copy and then convert to Row
s, I think it's more efficient to collect Row
s directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. Thanks for explaining your rationale.
@cloud-fan, thanks for documenting the behavior and removing the default copy. I had a couple of questions, but I think it is close. |
retest this please |
+1 when tests are passing. |
* Returns a data writer to do the actual writing work. Note that, Spark will reuse the same data | ||
* object instance when sending data to the data writer, for better performance. Data writers | ||
* are responsible for defensive copies if necessary, e.g. copy the data before buffer it in a | ||
* list. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the description about defensive copied in data writers, may be put in DataWriter
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll fix it in my next PR, thanks!
LGTM |
Test build #94233 has finished for PR 21948 at commit
|
retest this please |
Test build #94263 has finished for PR 21948 at commit
|
thanks, merging to master! |
A follow up of apache#21118 Since we use `InternalRow` in the read API of data source v2, we should do the same thing for the write API. existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21948 from cloud-fan/row-write. (cherry picked from commit ac527b5) Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterProvider.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
A follow up of apache#21118 Since we use `InternalRow` in the read API of data source v2, we should do the same thing for the write API. existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21948 from cloud-fan/row-write.
A follow up of apache#21118 Since we use `InternalRow` in the read API of data source v2, we should do the same thing for the write API. existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21948 from cloud-fan/row-write. (cherry picked from commit ac527b5) Conflicts: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterProvider.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
A follow up of apache#21118 Since we use `InternalRow` in the read API of data source v2, we should do the same thing for the write API. existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21948 from cloud-fan/row-write. Ref: LIHADOOP-48531 RB=1855948 G=superfriends-reviewers R=yezhou,latang,mshen,fli,zolin A=
What changes were proposed in this pull request?
A follow up of #21118
Since we use
InternalRow
in the read API of data source v2, we should do the same thing for the write API.How was this patch tested?
existing tests.