-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-18025] Use commit protocol API in structured streaming #15710
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
isAppend) | ||
|
||
WriteOutput.write( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking I should just rename WriteOutput to FileFormatOutput
Test build #3387 has finished for PR 15710 at commit
|
Test build #67873 has finished for PR 15710 at commit
|
Test build #67874 has finished for PR 15710 at commit
|
Test build #67877 has finished for PR 15710 at commit
|
Test build #67913 has finished for PR 15710 at commit
|
Test build #67912 has finished for PR 15710 at commit
|
} | ||
|
||
override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { | ||
if (addedFiles.nonEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this just an optimization to avoid instantiating the fs for empty writes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was copying the same logic from before -- but i think so...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually the other thing is that we are using the head. Technically we can use headOption and than map over it but it will be pretty weird ..
} | ||
} | ||
|
||
import org.apache.spark.sql.execution.datasources.OutputWriter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this down here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not. This is the top.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh... i see
import org.apache.spark.sql.types.{IntegerType, StructField, StructType} | ||
import org.apache.spark.util.Utils | ||
|
||
class FileStreamSinkSuite extends StreamTest { | ||
import testImplicits._ | ||
|
||
|
||
test("FileStreamSinkWriter - unpartitioned data") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about these tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They were testing code that's been deleted completely and is now purely redundant with all the tests we have for the batch write path.
LGTM |
Thanks, merging to master. |
## What changes were proposed in this pull request? This patch adds a new commit protocol implementation ManifestFileCommitProtocol that follows the existing streaming flow, and uses it in FileStreamSink to consolidate the write path in structured streaming with the batch mode write path. This deletes a lot of code, and would make it trivial to support other functionalities that are currently available in batch but not in streaming, including all file formats and bucketing. ## How was this patch tested? Should be covered by existing tests. Author: Reynold Xin <rxin@databricks.com> Closes apache#15710 from rxin/SPARK-18025.
What changes were proposed in this pull request?
This patch adds a new commit protocol implementation ManifestFileCommitProtocol that follows the existing streaming flow, and uses it in FileStreamSink to consolidate the write path in structured streaming with the batch mode write path.
This deletes a lot of code, and would make it trivial to support other functionalities that are currently available in batch but not in streaming, including all file formats and bucketing.
How was this patch tested?
Should be covered by existing tests.