Skip to content

[SPARK-18025] Use commit protocol API in structured streaming #15710

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from

Conversation

rxin
Copy link
Contributor

@rxin rxin commented Nov 1, 2016

What changes were proposed in this pull request?

This patch adds a new commit protocol implementation ManifestFileCommitProtocol that follows the existing streaming flow, and uses it in FileStreamSink to consolidate the write path in structured streaming with the batch mode write path.

This deletes a lot of code, and would make it trivial to support other functionalities that are currently available in batch but not in streaming, including all file formats and bucketing.

How was this patch tested?

Should be covered by existing tests.

@rxin
Copy link
Contributor Author

rxin commented Nov 1, 2016

cc @ericl, @marmbrus, @zsxwing and @lw-lin (I guess this would supersede your old PR).

isAppend)

WriteOutput.write(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking I should just rename WriteOutput to FileFormatOutput

@SparkQA
Copy link

SparkQA commented Nov 1, 2016

Test build #3387 has finished for PR 15710 at commit e9823e7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 1, 2016

Test build #67873 has finished for PR 15710 at commit e9823e7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 1, 2016

Test build #67874 has finished for PR 15710 at commit 1c906c9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 1, 2016

Test build #67877 has finished for PR 15710 at commit 1c3a645.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 1, 2016

Test build #67913 has finished for PR 15710 at commit a2ea180.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 1, 2016

Test build #67912 has finished for PR 15710 at commit 0742318.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
if (addedFiles.nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just an optimization to avoid instantiating the fs for empty writes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was copying the same logic from before -- but i think so...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually the other thing is that we are using the head. Technically we can use headOption and than map over it but it will be pretty weird ..

}
}

import org.apache.spark.sql.execution.datasources.OutputWriter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this down here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not. This is the top.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh... i see

import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.Utils

class FileStreamSinkSuite extends StreamTest {
import testImplicits._


test("FileStreamSinkWriter - unpartitioned data") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about these tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They were testing code that's been deleted completely and is now purely redundant with all the tests we have for the batch write path.

@marmbrus
Copy link
Contributor

marmbrus commented Nov 2, 2016

LGTM

@marmbrus
Copy link
Contributor

marmbrus commented Nov 2, 2016

Thanks, merging to master.

@asfgit asfgit closed this in 77a9816 Nov 2, 2016
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
## What changes were proposed in this pull request?
This patch adds a new commit protocol implementation ManifestFileCommitProtocol that follows the existing streaming flow, and uses it in FileStreamSink to consolidate the write path in structured streaming with the batch mode write path.

This deletes a lot of code, and would make it trivial to support other functionalities that are currently available in batch but not in streaming, including all file formats and bucketing.

## How was this patch tested?
Should be covered by existing tests.

Author: Reynold Xin <rxin@databricks.com>

Closes apache#15710 from rxin/SPARK-18025.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants