Skip to content

[SPARK-26785][SQL] data source v2 API refactor: streaming write #23702

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

Continue the API refactor for streaming write, according to the doc.

The major changes:

  1. rename StreamingWriteSupport to StreamingWrite
  2. add WriteBuilder.buildForStreaming
  3. update existing sinks, to move the creation of StreamingWrite to Table

How was this patch tested?

existing tests

@SparkQA
Copy link

SparkQA commented Jan 30, 2019

Test build #101901 has finished for PR 23702 at commit 2a88926.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

WriterCommitMessage.java seems to be the last one to have the old StreamingWriteSupport.

$ git grep StreamingWriteSupport
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java:import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport;
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java: * {@link StreamingWriteSupport#commit(long, WriterCommitMessage[])}.

@SparkQA
Copy link

SparkQA commented Jan 31, 2019

Test build #101932 has finished for PR 23702 at commit bac44f7.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jan 31, 2019

Test build #101942 has finished for PR 23702 at commit bac44f7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 31, 2019

Test build #101953 has finished for PR 23702 at commit 14890d2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

cc @jose-torres @rdblue @gatorsmile

WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, writer), newAttributePlan)
case s: SupportsStreamingWrite =>
// TODO: we should translate OutputMode to concrete write actions like truncate, but
// the truncate action is being developed in SPARK-26666.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also needs to validate that OutputMode is not Update and throw an exception if it is. OutputMode.Update can lead to undefined behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this mean that DataSourceV2 writers can't handle the full DataStreamWriter interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not very familiar with streaming internals so I don't know what's the exact semantic of this UPDATE mode.

I believe we can treat it as SaveMode: replace it with the new write operators(Append, Truncate, OverwriteByExpression, etc.) and make the semantic clear.

Anyway this is just a TODO comment, we can have more discussion in the PR that address this TODO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wasn't addressed. Although my review input was on a comment line, I wasn't saying that this needs to be done when the TODO item is done. The v2 implementation needs to fail when passed a dangerous config setting.

Copy link
Contributor Author

@cloud-fan cloud-fan Feb 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent more time looking into the streaming output mode. The UPDATE mode is by design and is useful when the watermark is not specified, i.e. users don't want to drop any record no matter how late the record is.

The output mode is applied to the entire streaming query, so all the stateful operators and the sinks need to support it.

The StateStoreSaveExec support UPDATE mode with a well-defined semantic: it saves all the input records to state store and output them to downstream operators, so the downstream aggregate/join can produce results earlier. e.g. the first epoch outputs

|     date     | count |
|   2018-1-1   |   1   |
|   2018-1-2   |   2   |

and the next eppch outputs

|     date     | count |
|   2018-1-2   |   1   |
|   2018-1-3   |   3   |

However, if there are 2 aggregates, the second aggregate can't support UPDATE mode and deal with the early results from the first aggregate. Fortunately we disallow more than one streaming aggregate in a query, so there is no problem.

The other problem is, all the existing sinks do not support UPDATE mode. They either ignore output mode completely, or treat UPDATE same as APPEND. I think they should fail if output mode is UPDATE, as they don't really support it.

Overall, I think UPDATE mode is useful but is hard to implement. We need a mechanism to propagate the "update key" to down stream sateful operators and the sink. That's why I think no source can support UPDATE mode unless we improve the streaming framework to progate the "update key".

Is it possible to remove UPDATE mode completely and re-add it when we have a detailed design? cc @rxin @marmbrus @tdas @jose-torres

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for being unclear. My comment was in response to wenchen's suggestion that we "remove UPDATE mode completely and re-add it when we have a detailed design?".

The feature has utility even without passing the key. That said, I agree it would be better if the key is available in the V2 API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, thanks for the clarification. I have no problem keeping this as-is in v1. I just want to make sure we fix those problems when we introduce the features in v2. Given that we have a proposal for how to do that, I think we should use it instead of continuing to add code that uses an unreliable mechanism.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps I should say this more strongly. It would be much better if we pass the key information in a unified way. We just didn't do it because we didn't want to be changing V1, given the belief that V2 was coming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing UPDATE mode was not my suggestion. I just raise it and wait for someone to say no.

I've already proposed a way to do that.

I thought your proposal was to forbid UPDATE mode in all streaming sinks, and seems I was wrong. To confirm, you mean we should only forbid UPDATE mode in v2 sinks, right? @rdblue

Copy link
Contributor

@rdblue rdblue Feb 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, I don't want to remove the update feature. I want the v2 API to correctly communicate how to update records. That requires passing the update key.

Consequently, configuring a sink using OutputMode.Update is unsafe. It doesn't communicate the key. My position is (and has been) that we cannot use OutputMode.Update to configure v2 sinks because the update behavior -- which rows should be replaced -- is undefined.

Instead, I proposed a mix-in to the WriteBuilder:

public interface SupportsUpdate extends WriteBuilder {
  WriteBuilder updateByKey(String... column)
}

Clearly, we might not want to use String... but this shows a simple way to communicate the key columns.

Copy link
Contributor

@jose-torres jose-torres left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes look functionally correct to me. I haven't been keeping up to date with the API design discussions, so I'm trusting (and excluding from my review) that the names and class structure match what was agreed to.

@SparkQA
Copy link

SparkQA commented Feb 16, 2019

Test build #102413 has finished for PR 23702 at commit 80036ad.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@gatorsmile gatorsmile left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Thanks! Merged to master.

@asfgit asfgit closed this in f85ed9a Feb 19, 2019
@rdblue
Copy link
Contributor

rdblue commented Feb 19, 2019

@rxin, @cloud-fan, @gatorsmile, please take the time to address comments with the community. Wenchen just responded to my review comment 2 days ago, over the weekend. The problem I raised wasn't addressed and I (yet again) don't think it is appropriate to merge issues without a resolution.

@cloud-fan, would you please follow up with a PR that specifically disallows passing the unsafe mode?

mccheah pushed a commit to palantir/spark that referenced this pull request May 15, 2019
## What changes were proposed in this pull request?

Continue the API refactor for streaming write, according to the [doc](https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing).

The major changes:
1. rename `StreamingWriteSupport` to `StreamingWrite`
2. add `WriteBuilder.buildForStreaming`
3. update existing sinks, to move the creation of `StreamingWrite` to `Table`

## How was this patch tested?

existing tests

Closes apache#23702 from cloud-fan/stream-write.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
cloud-fan added a commit that referenced this pull request May 20, 2020
### What changes were proposed in this pull request?

This PR adds a private `WriteBuilder` mixin trait: `SupportsStreamingUpdate`, so that the builtin v2 streaming sinks can still support the update mode.

Note: it's private because we don't have a proper design yet. I didn't take the proposal in #23702 (comment) because we may want something more general, like updating by an expression `key1 = key2 + 10`.

### Why are the changes needed?

In Spark 2.4, all builtin v2 streaming sinks support all streaming output modes, and v2 sinks are enabled by default, see https://issues.apache.org/jira/browse/SPARK-22911

It's too risky for 3.0 to go back to v1 sinks, so I propose to add a private trait to fix builtin v2 sinks, to keep backward compatibility.

### Does this PR introduce _any_ user-facing change?

Yes, now all the builtin v2 streaming sinks support all streaming output modes, which is the same as 2.4

### How was this patch tested?

existing tests.

Closes #28523 from cloud-fan/update.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 34414ac)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
cloud-fan added a commit that referenced this pull request May 20, 2020
### What changes were proposed in this pull request?

This PR adds a private `WriteBuilder` mixin trait: `SupportsStreamingUpdate`, so that the builtin v2 streaming sinks can still support the update mode.

Note: it's private because we don't have a proper design yet. I didn't take the proposal in #23702 (comment) because we may want something more general, like updating by an expression `key1 = key2 + 10`.

### Why are the changes needed?

In Spark 2.4, all builtin v2 streaming sinks support all streaming output modes, and v2 sinks are enabled by default, see https://issues.apache.org/jira/browse/SPARK-22911

It's too risky for 3.0 to go back to v1 sinks, so I propose to add a private trait to fix builtin v2 sinks, to keep backward compatibility.

### Does this PR introduce _any_ user-facing change?

Yes, now all the builtin v2 streaming sinks support all streaming output modes, which is the same as 2.4

### How was this patch tested?

existing tests.

Closes #28523 from cloud-fan/update.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants