Skip to content

[SPARK-24991][SQL] use InternalRow in DataSourceWriter #21948

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from

Conversation

cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

A follow up of #21118

Since we use InternalRow in the read API of data source v2, we should do the same thing for the write API.

How was this patch tested?

existing tests.

@holdensmagicalunicorn
Copy link

@cloud-fan, thanks! I am a bot who has found some folks who might be able to help with the review:@gatorsmile, @zsxwing and @tdas

@@ -89,8 +89,7 @@ class RateStreamContinuousReader(options: DataSourceOptions) extends ContinuousR
start.runTimeMs,
i,
numPartitions,
perPartitionRate)
.asInstanceOf[InputPartition[InternalRow]]
perPartitionRate): InputPartition[InternalRow]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed to make it compile, but at least we don't need to do cast.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be in a separate commit. I didn't notice yesterday that this is for the writer until it was linked from the other issue. I think this change needs to get in, but it should not be mixed into changes for the write path.

@cloud-fan
Copy link
Contributor Author

cc @rdblue @jose-torres @gatorsmile

@jose-torres
Copy link
Contributor

lgtm

@SparkQA
Copy link

SparkQA commented Aug 1, 2018

Test build #93898 has finished for PR 21948 at commit 852c6f3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

/**
* When true, Spark will reuse the same data object instance when sending data to the data writer,
* for better performance. Data writers should carefully handle the data objects if it's reused,
* e.g. do not buffer the data objects in a list. By default it returns false for safety, data
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue did you hit this problem in iceberg?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, Iceberg assumes that data objects are reused.

@SparkQA
Copy link

SparkQA commented Aug 2, 2018

Test build #93937 has finished for PR 21948 at commit a42e628.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 2, 2018

Test build #93938 has finished for PR 21948 at commit 1e0cb90.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member

kiszk commented Aug 2, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Aug 2, 2018

Test build #93974 has finished for PR 21948 at commit 1e0cb90.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


// write the data and commit this writer.
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
while (iter.hasNext) {
dataWriter.write(iter.next())
// Internally Spark reuse the same UnsafeRow instance when producing output rows, here we
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: reuse => reuses


/**
* When true, Spark will reuse the same data object instance when sending data to the data writer,
* for better performance. Data writers should carefully handle the data objects if it's reused,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: if it's reused the it here is ambiguous. Maybe change to if they are reused?

/**
* When true, Spark will reuse the same data object instance when sending data to the data writer,
* for better performance. Data writers should carefully handle the data objects if it's reused,
* e.g. do not buffer the data objects in a list. By default it returns false for safety, data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: By default it returns false, data sources => By default the method returns false. Data sources

Copy link
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM otherwise.

* sources can override it if their data writers immediately write the data object to somewhere
* else like a memory buffer or disk.
*/
default boolean reuseDataObject() {
Copy link
Contributor

@rdblue rdblue Aug 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be added in this commit. This is to move to InternalRow and should not alter the API. If we want to add this, then let's discuss it in a PR for this as a feature. I'm fine documenting the default reuse behavior in this commit, though.

I think writers are responsible for defensive copies if necessary. This default is going to cause sources to be slower and I don't think it is necessary for implementations that aren't tests buffering data in memory.

In general, I think it's okay for us to have higher expectations of sources than of users. It's okay to simply document that rows are reused.

@rdblue
Copy link
Contributor

rdblue commented Aug 2, 2018

I'm changing my +1 to -1 because read-side changes are mixed in and because copies are the responsibility of data sources if they buffer and hold references to earlier rows. At a minimum, changing that expectation should be done in a separate issue and PR.

@cloud-fan
Copy link
Contributor Author

@rdblue I have documented the object reuse behavior and ask data source to handle it, please take a look, thanks!

@SparkQA
Copy link

SparkQA commented Aug 3, 2018

Test build #94106 has finished for PR 21948 at commit 86817c7.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row])
  • case class MemoryWriterFactory(outputMode: OutputMode, schema: StructType)
  • class MemoryDataWriter(partition: Int, outputMode: OutputMode, schema: StructType)

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Aug 4, 2018

Test build #94201 has finished for PR 21948 at commit 86817c7.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row])
  • case class MemoryWriterFactory(outputMode: OutputMode, schema: StructType)
  • class MemoryDataWriter(partition: Int, outputMode: OutputMode, schema: StructType)

@gatorsmile
Copy link
Member

retest this please

@@ -89,7 +89,8 @@ class RateStreamContinuousReader(options: DataSourceOptions) extends ContinuousR
start.runTimeMs,
i,
numPartitions,
perPartitionRate): InputPartition[InternalRow]
perPartitionRate)
.asInstanceOf[InputPartition[InternalRow]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this cast necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to address your comments: do not mix read-side changes. So I reverted it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, sorry about that. I should have looked at the whole diff.

MemoryWriterCommitMessage(0, Seq(InternalRow(1), InternalRow(2))),
MemoryWriterCommitMessage(1, Seq(InternalRow(3), InternalRow(4))),
MemoryWriterCommitMessage(2, Seq(InternalRow(6), InternalRow(7)))
MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this changed back to Row?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now the DataWriter needs to copy the input row before buffering it, which can be done by the RowEncoder when converting InternalRow to Row. Then the write message carries Rows to the driver side.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use InternalRow.copy? I'd rather keep the update to InternalRow, but as long as the tests pass I wouldn't block this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the memory sink needs Rows at the end. Instead of collecting InternalRows via copy and then convert to Rows, I think it's more efficient to collect Rows directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Thanks for explaining your rationale.

@rdblue
Copy link
Contributor

rdblue commented Aug 5, 2018

@cloud-fan, thanks for documenting the behavior and removing the default copy. I had a couple of questions, but I think it is close.

@cloud-fan
Copy link
Contributor Author

retest this please

@rdblue
Copy link
Contributor

rdblue commented Aug 5, 2018

+1 when tests are passing.

* Returns a data writer to do the actual writing work. Note that, Spark will reuse the same data
* object instance when sending data to the data writer, for better performance. Data writers
* are responsible for defensive copies if necessary, e.g. copy the data before buffer it in a
* list.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the description about defensive copied in data writers, may be put in DataWriter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix it in my next PR, thanks!

@viirya
Copy link
Member

viirya commented Aug 5, 2018

LGTM

@SparkQA
Copy link

SparkQA commented Aug 5, 2018

Test build #94233 has finished for PR 21948 at commit 86817c7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row])
  • case class MemoryWriterFactory(outputMode: OutputMode, schema: StructType)
  • class MemoryDataWriter(partition: Int, outputMode: OutputMode, schema: StructType)

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Aug 6, 2018

Test build #94263 has finished for PR 21948 at commit 86817c7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row])
  • case class MemoryWriterFactory(outputMode: OutputMode, schema: StructType)
  • class MemoryDataWriter(partition: Int, outputMode: OutputMode, schema: StructType)

@cloud-fan
Copy link
Contributor Author

thanks, merging to master!

@asfgit asfgit closed this in ac527b5 Aug 6, 2018
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Mar 7, 2019
A follow up of apache#21118

Since we use `InternalRow` in the read API of data source v2, we should do the same thing for the write API.

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21948 from cloud-fan/row-write.

(cherry picked from commit ac527b5)

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterProvider.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
	sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
	sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
rdblue pushed a commit to rdblue/spark that referenced this pull request Apr 3, 2019
A follow up of apache#21118

Since we use `InternalRow` in the read API of data source v2, we should do the same thing for the write API.

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21948 from cloud-fan/row-write.
jzhuge pushed a commit to jzhuge/spark that referenced this pull request Oct 15, 2019
A follow up of apache#21118

Since we use `InternalRow` in the read API of data source v2, we should do the same thing for the write API.

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21948 from cloud-fan/row-write.

(cherry picked from commit ac527b5)

Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousWriteRDD.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterProvider.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
	sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
	sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
	sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
A follow up of apache#21118

Since we use `InternalRow` in the read API of data source v2, we should do the same thing for the write API.

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21948 from cloud-fan/row-write.

Ref: LIHADOOP-48531

RB=1855948
G=superfriends-reviewers
R=yezhou,latang,mshen,fli,zolin
A=
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants