-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-26956][SS] remove streaming output mode from data source v2 APIs #23859
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
||
case Update => | ||
// Although no v2 sinks really support Update mode now, but during tests we do want them | ||
// to pretend to support Update mode, and treat Update mode same as Append mode. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very useful when testing with aggregate/join. We don't want to complicate the test cases using watermarks, and we can't use complete mode as some sinks don't support it.
Test build #102581 has finished for PR 23859 at commit
|
Does we have any docs representing the background/discussion around this change? Doesn't sound small change and directly impact to structured streaming so I would like to fully understand it.
Looks like we will have only new data then. Is it correct or it should be fixed as appending all data instead of new data?
I guess this means we don't separate keys and values while passing to sink, so sink cannot perform upsert (though target system still can upsert if target system knows about keys and values). So SupportsUpdate will get keys and values separately. Do I understand correctly? |
Is there any design doc on what we plan to support in the new model? If redesigning, it might be worth re-evaluating the need for these different modes before proceeding with the implementation. For instance the Streaming sinks cannot correctly handle the updates without support for retractions. And not sure how the truncate is going to be used. Do we expect the sink to completely truncate the output and fill it with the new values for each micro batch? I am not sure how thats very useful other than maybe in the console sink. I think ideally what we want is for an operator to emit a stream of values (and retractions) to the sink so that the sink can correctly process and update the results. So may be the different modes are not needed other than for backward compatibility reasons. |
@HeartSaVioR this proposal was first discussed in a data source v2 community meeting, and then in this doc. The recent discussion happens in this PR. In general, Update mode is not supported right now. The streaming framework is not able to propagate the "update key" to the sink. But users can look at the query manually and hardcode the "update key" in their custom sink, so we leave it for streaming data source v1, but not v2, as it's too hacky. @arunmahadevan you are right that other modes except Append are not very useful and are mostly used for debugging. However, Complete mode has a very clear semantic(truncate all the old data and write new data), and we already have the |
retest this please |
I'm sorry but I can't find mentioning of complete mode from neither the doc nor the PR you linked. Btw, here's explanation of
I'd understand this semantic as the sink always receives entire result table at every batch, and then your explanation of complete mode sounds to work differently. I guess this is due to the fact we can't expect upsert works well with sinks so have to truncate all, but then shouldn't we provide updated entire result table including old data instead of only providing new data? Maybe there's confusion regarding definition of "new data". Previously I would only think final result always contain every rows the query outputs. I would like to be clear that we keep this as it is, or let sink decide it. |
Sorry I didn't quote the right comment: https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?disco=AAAACdk332A Let me just copy the comment here:
What is the difference? The sink always receives entire result table at every batch, and is asked to remove all the old data. That's the expected end-to-end behavior of complete mode. |
OK got it. Turns out it was not clear of the meaning of "new data" (I confused it with updated rows in current batch). Thanks for making it clear! It might be better we make clear of "new data" as "updated entire result table" in PR's description. |
Test build #102618 has finished for PR 23859 at commit
|
Test build #102651 has finished for PR 23859 at commit
|
retest this please |
Test build #102795 has finished for PR 23859 at commit
|
cc @rdblue |
+1 Thanks for working on this, @cloud-fan! It looks good to me. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Merged to master.
## What changes were proposed in this pull request? Similar to `SaveMode`, we should remove streaming `OutputMode` from data source v2 API, and use operations that has clear semantic. The changes are: 1. append mode: create `StreamingWrite` directly. By default, the `WriteBuilder` will create `Write` to append data. 2. complete mode: call `SupportsTruncate#truncate`. Complete mode means truncating all the old data and appending new data of the current epoch. `SupportsTruncate` has exactly the same semantic. 3. update mode: fail. The current streaming framework can't propagate the update keys, so v2 sinks are not able to implement update mode. In the future we can introduce a `SupportsUpdate` trait. The behavior changes: 1. all the v2 sinks(foreach, console, memory, kafka, noop) don't support update mode. The fact is, previously all the v2 sinks implement the update mode wrong. None of them can really support it. 2. kafka sink doesn't support complete mode. The fact is, the kafka sink can only append data. ## How was this patch tested? existing tests Closes apache#23859 from cloud-fan/update. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
@cloud-fan Then what happens to sorting operations which can run only in complete mode? By truncating the data accumulated in previous batch does it imply we will apply sort operations only within the data available in the current micro-batch? If my assumption is right aren't we going back to Dstream behaviour of applying window transformation over the batch interval? |
the "new data" mentioned in this PR means the new result of running the query with the entire input, not a single micro-batch. This is the semantic of complete mode AFAIK. |
Btw, does the concern based on the real world workload? Because I cannot imagine "complete mode" works with decent amount of traffic, especially you're running the query for long time. "complete mode" means you cannot evict any state regardless of watermark, which won't make sense except you have finite set of group key (if then the cardinality of group keys will define the overall size of state).
That's why "state" comes into play in structured streaming. The state retains the values across micro-batches, "windows" in case of window transformations. In fact, as previous comments in this PR stated already, the only mode works without any tweak in production is append mode. In update mode you can tweak with custom sink to make it correctly upsert with the output, but there's no API to define "group keys" in existing sinks. Btw, the streaming output mode is all about how to emit output for the stateful operation. If you don't do any stateful operation, output mode is no-op. |
@cloud-fan Just to be clear on the impact of this change, v2 sources like In addition, I have the same question for Kafka. If someone could run a streaming query writing Kafka in update mode Spark 2.4, will still work in Spark 3.0 under default settings? |
I guess this change was originated from #23702 - if I understand correctly, the precondition of this change is that we will bring the API for update (strictly saying, upsert) mode, but it doesn't seem to happen. As of now, this change would discontinue some use cases which the owners of the query want to manually deal with upsert as @marmbrus pointed out in #23702. That is beyond the concern about breaking existing queries, because there's also another perspective of streaming output mode, the latency of output, the one of core key points in streaming. They would be willing to deal with upsert by themselves (even Spark doesn't support it naturally) if their use case is critical on reflecting the updated information on time. They can explicitly deal with foreach sink (or custom sink which deals with the upsert), or implicitly deal with storage side feature like compact topic. Even we'd like to tolerate the breaking change to save some time for better design of update API, the change should be guided to the release note, structured streaming migration guide, and also structured streaming programming guide should be modified. |
If the same query works in 2.4 but not 3.0, it's a breaking change, and we should treat it seriously. I'm fixing it in #28523 |
-1 for adding this back. The update mode is ambiguous and doesn't tell the source which rows to replace in the output. When we originally removed it, there were no sources that actually implemented it. What is the source that has a behavior change? Are we sure that it is due to this? |
I think you mean sinks. Any source can be read from and then processed in update mode. I know of a lot of production use cases that use the I'm also not sure I would say its "ambiguous". For any given query its clear what they key, we just don't have machinery to automatically indicate it to the sink. The lack of that machinery though does not seem like a good reason to break cases that were working just fine before 3.0. |
I'm not suggesting we break for 3.0. I just don't think that it's a good idea to compromise the DSv2 API for this reason -- if we don't communicate what gets replaced then there is no way to have reliable behavior. I think @tdas has suggested a good work-around, so let's go with that. |
(Just 2 cents, we'd better not to use "source" as shortcut of "data source" which gives confusion with "source" in source/sink.) |
What changes were proposed in this pull request?
Similar to
SaveMode
, we should remove streamingOutputMode
from data source v2 API, and use operations that has clear semantic.The changes are:
StreamingWrite
directly. By default, theWriteBuilder
will createWrite
to append data.SupportsTruncate#truncate
. Complete mode means truncating all the old data and appending new data of the current epoch.SupportsTruncate
has exactly the same semantic.SupportsUpdate
trait.The behavior changes:
How was this patch tested?
existing tests