-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-26785][SQL] data source v2 API refactor: streaming write #23702
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #101901 has finished for PR 23702 at commit
|
|
2a88926
to
bac44f7
Compare
Test build #101932 has finished for PR 23702 at commit
|
retest this please |
Test build #101942 has finished for PR 23702 at commit
|
bac44f7
to
14890d2
Compare
Test build #101953 has finished for PR 23702 at commit
|
WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, writer), newAttributePlan) | ||
case s: SupportsStreamingWrite => | ||
// TODO: we should translate OutputMode to concrete write actions like truncate, but | ||
// the truncate action is being developed in SPARK-26666. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also needs to validate that OutputMode is not Update and throw an exception if it is. OutputMode.Update can lead to undefined behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this mean that DataSourceV2 writers can't handle the full DataStreamWriter interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not very familiar with streaming internals so I don't know what's the exact semantic of this UPDATE mode.
I believe we can treat it as SaveMode
: replace it with the new write operators(Append, Truncate, OverwriteByExpression, etc.) and make the semantic clear.
Anyway this is just a TODO comment, we can have more discussion in the PR that address this TODO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This wasn't addressed. Although my review input was on a comment line, I wasn't saying that this needs to be done when the TODO item is done. The v2 implementation needs to fail when passed a dangerous config setting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I spent more time looking into the streaming output mode. The UPDATE mode is by design and is useful when the watermark is not specified, i.e. users don't want to drop any record no matter how late the record is.
The output mode is applied to the entire streaming query, so all the stateful operators and the sinks need to support it.
The StateStoreSaveExec
support UPDATE mode with a well-defined semantic: it saves all the input records to state store and output them to downstream operators, so the downstream aggregate/join can produce results earlier. e.g. the first epoch outputs
| date | count |
| 2018-1-1 | 1 |
| 2018-1-2 | 2 |
and the next eppch outputs
| date | count |
| 2018-1-2 | 1 |
| 2018-1-3 | 3 |
However, if there are 2 aggregates, the second aggregate can't support UPDATE mode and deal with the early results from the first aggregate. Fortunately we disallow more than one streaming aggregate in a query, so there is no problem.
The other problem is, all the existing sinks do not support UPDATE mode. They either ignore output mode completely, or treat UPDATE same as APPEND. I think they should fail if output mode is UPDATE, as they don't really support it.
Overall, I think UPDATE mode is useful but is hard to implement. We need a mechanism to propagate the "update key" to down stream sateful operators and the sink. That's why I think no source can support UPDATE mode unless we improve the streaming framework to progate the "update key".
Is it possible to remove UPDATE mode completely and re-add it when we have a detailed design? cc @rxin @marmbrus @tdas @jose-torres
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for being unclear. My comment was in response to wenchen's suggestion that we "remove UPDATE mode completely and re-add it when we have a detailed design?".
The feature has utility even without passing the key. That said, I agree it would be better if the key is available in the V2 API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, thanks for the clarification. I have no problem keeping this as-is in v1. I just want to make sure we fix those problems when we introduce the features in v2. Given that we have a proposal for how to do that, I think we should use it instead of continuing to add code that uses an unreliable mechanism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps I should say this more strongly. It would be much better if we pass the key information in a unified way. We just didn't do it because we didn't want to be changing V1, given the belief that V2 was coming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing UPDATE mode was not my suggestion. I just raise it and wait for someone to say no.
I've already proposed a way to do that.
I thought your proposal was to forbid UPDATE mode in all streaming sinks, and seems I was wrong. To confirm, you mean we should only forbid UPDATE mode in v2 sinks, right? @rdblue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, I don't want to remove the update feature. I want the v2 API to correctly communicate how to update records. That requires passing the update key.
Consequently, configuring a sink using OutputMode.Update
is unsafe. It doesn't communicate the key. My position is (and has been) that we cannot use OutputMode.Update
to configure v2 sinks because the update behavior -- which rows should be replaced -- is undefined.
Instead, I proposed a mix-in to the WriteBuilder
:
public interface SupportsUpdate extends WriteBuilder {
WriteBuilder updateByKey(String... column)
}
Clearly, we might not want to use String...
but this shows a simple way to communicate the key columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes look functionally correct to me. I haven't been keeping up to date with the API design discussions, so I'm trusting (and excluding from my review) that the names and class structure match what was agreed to.
14890d2
to
80036ad
Compare
Test build #102413 has finished for PR 23702 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks! Merged to master.
@rxin, @cloud-fan, @gatorsmile, please take the time to address comments with the community. Wenchen just responded to my review comment 2 days ago, over the weekend. The problem I raised wasn't addressed and I (yet again) don't think it is appropriate to merge issues without a resolution. @cloud-fan, would you please follow up with a PR that specifically disallows passing the unsafe mode? |
## What changes were proposed in this pull request? Continue the API refactor for streaming write, according to the [doc](https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing). The major changes: 1. rename `StreamingWriteSupport` to `StreamingWrite` 2. add `WriteBuilder.buildForStreaming` 3. update existing sinks, to move the creation of `StreamingWrite` to `Table` ## How was this patch tested? existing tests Closes apache#23702 from cloud-fan/stream-write. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
### What changes were proposed in this pull request? This PR adds a private `WriteBuilder` mixin trait: `SupportsStreamingUpdate`, so that the builtin v2 streaming sinks can still support the update mode. Note: it's private because we don't have a proper design yet. I didn't take the proposal in #23702 (comment) because we may want something more general, like updating by an expression `key1 = key2 + 10`. ### Why are the changes needed? In Spark 2.4, all builtin v2 streaming sinks support all streaming output modes, and v2 sinks are enabled by default, see https://issues.apache.org/jira/browse/SPARK-22911 It's too risky for 3.0 to go back to v1 sinks, so I propose to add a private trait to fix builtin v2 sinks, to keep backward compatibility. ### Does this PR introduce _any_ user-facing change? Yes, now all the builtin v2 streaming sinks support all streaming output modes, which is the same as 2.4 ### How was this patch tested? existing tests. Closes #28523 from cloud-fan/update. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 34414ac) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request? This PR adds a private `WriteBuilder` mixin trait: `SupportsStreamingUpdate`, so that the builtin v2 streaming sinks can still support the update mode. Note: it's private because we don't have a proper design yet. I didn't take the proposal in #23702 (comment) because we may want something more general, like updating by an expression `key1 = key2 + 10`. ### Why are the changes needed? In Spark 2.4, all builtin v2 streaming sinks support all streaming output modes, and v2 sinks are enabled by default, see https://issues.apache.org/jira/browse/SPARK-22911 It's too risky for 3.0 to go back to v1 sinks, so I propose to add a private trait to fix builtin v2 sinks, to keep backward compatibility. ### Does this PR introduce _any_ user-facing change? Yes, now all the builtin v2 streaming sinks support all streaming output modes, which is the same as 2.4 ### How was this patch tested? existing tests. Closes #28523 from cloud-fan/update. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Continue the API refactor for streaming write, according to the doc.
The major changes:
StreamingWriteSupport
toStreamingWrite
WriteBuilder.buildForStreaming
StreamingWrite
toTable
How was this patch tested?
existing tests