Skip to content

[SPARK-26956][SS] remove streaming output mode from data source v2 APIs #23859

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Feb 21, 2019

What changes were proposed in this pull request?

Similar to SaveMode, we should remove streaming OutputMode from data source v2 API, and use operations that has clear semantic.

The changes are:

  1. append mode: create StreamingWrite directly. By default, the WriteBuilder will create Write to append data.
  2. complete mode: call SupportsTruncate#truncate. Complete mode means truncating all the old data and appending new data of the current epoch. SupportsTruncate has exactly the same semantic.
  3. update mode: fail. The current streaming framework can't propagate the update keys, so v2 sinks are not able to implement update mode. In the future we can introduce a SupportsUpdate trait.

The behavior changes:

  1. all the v2 sinks(foreach, console, memory, kafka, noop) don't support update mode. The fact is, previously all the v2 sinks implement the update mode wrong. None of them can really support it.
  2. kafka sink doesn't support complete mode. The fact is, the kafka sink can only append data.

How was this patch tested?

existing tests

@cloud-fan cloud-fan changed the title [SPARK-26956][SQL] translate streaming output mode to write operators [SPARK-26956][SQL] remove streaming output mode from data source v2 APIs Feb 21, 2019

case Update =>
// Although no v2 sinks really support Update mode now, but during tests we do want them
// to pretend to support Update mode, and treat Update mode same as Append mode.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very useful when testing with aggregate/join. We don't want to complicate the test cases using watermarks, and we can't use complete mode as some sinks don't support it.

@cloud-fan
Copy link
Contributor Author

cc @jose-torres @rdblue

@SparkQA
Copy link

SparkQA commented Feb 21, 2019

Test build #102581 has finished for PR 23859 at commit 32615df.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

Does we have any docs representing the background/discussion around this change? Doesn't sound small change and directly impact to structured streaming so I would like to fully understand it.

  1. complete mode: call SupportsTruncate#truncate. Complete mode means truncating all the old data and appending new data, and SupportsTruncate has exactly the same semantic.

Looks like we will have only new data then. Is it correct or it should be fixed as appending all data instead of new data?

  1. update mode: fail. The current streaming framework can't propagate the update keys, so v2 sinks are not able to implement update mode. In the future we can introduce a SupportsUpdate trait.

I guess this means we don't separate keys and values while passing to sink, so sink cannot perform upsert (though target system still can upsert if target system knows about keys and values). So SupportsUpdate will get keys and values separately. Do I understand correctly?

@arunmahadevan
Copy link
Contributor

Is there any design doc on what we plan to support in the new model? If redesigning, it might be worth re-evaluating the need for these different modes before proceeding with the implementation.

For instance the Streaming sinks cannot correctly handle the updates without support for retractions.

And not sure how the truncate is going to be used. Do we expect the sink to completely truncate the output and fill it with the new values for each micro batch? I am not sure how thats very useful other than maybe in the console sink.

I think ideally what we want is for an operator to emit a stream of values (and retractions) to the sink so that the sink can correctly process and update the results. So may be the different modes are not needed other than for backward compatibility reasons.

@cloud-fan
Copy link
Contributor Author

@HeartSaVioR this proposal was first discussed in a data source v2 community meeting, and then in this doc.

The recent discussion happens in this PR.

In general, Update mode is not supported right now. The streaming framework is not able to propagate the "update key" to the sink. But users can look at the query manually and hardcode the "update key" in their custom sink, so we leave it for streaming data source v1, but not v2, as it's too hacky.

@arunmahadevan you are right that other modes except Append are not very useful and are mostly used for debugging. However, Complete mode has a very clear semantic(truncate all the old data and write new data), and we already have the SupportsTruncate trait that matches this semantic, so we still support it.

@cloud-fan
Copy link
Contributor Author

retest this please

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Feb 22, 2019

I'm sorry but I can't find mentioning of complete mode from neither the doc nor the PR you linked.

Btw, here's explanation of Complete Mode in doc on Structured Streaming http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html :

Complete Mode - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.

I'd understand this semantic as the sink always receives entire result table at every batch, and then your explanation of complete mode sounds to work differently. I guess this is due to the fact we can't expect upsert works well with sinks so have to truncate all, but then shouldn't we provide updated entire result table including old data instead of only providing new data? Maybe there's confusion regarding definition of "new data".

Previously I would only think final result always contain every rows the query outputs. I would like to be clear that we keep this as it is, or let sink decide it.

@cloud-fan
Copy link
Contributor Author

Sorry I didn't quote the right comment: https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?disco=AAAACdk332A

Let me just copy the comment here:

because we can just implement complete mode with overwrite (or alternatively truncate + append). The append mode is just the normal write append. We can figure out how to support the delta mode later when we add them, since there is no source implementation for it now.

I'd understand this semantic as the sink always receives entire result table at every batch, and then your explanation of complete mode sounds to work differently.

What is the difference? The sink always receives entire result table at every batch, and is asked to remove all the old data. That's the expected end-to-end behavior of complete mode.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Feb 22, 2019

OK got it. Turns out it was not clear of the meaning of "new data" (I confused it with updated rows in current batch). Thanks for making it clear! It might be better we make clear of "new data" as "updated entire result table" in PR's description.

@SparkQA
Copy link

SparkQA commented Feb 22, 2019

Test build #102618 has finished for PR 23859 at commit 32615df.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan cloud-fan changed the title [SPARK-26956][SQL] remove streaming output mode from data source v2 APIs [SPARK-26956][SS] remove streaming output mode from data source v2 APIs Feb 22, 2019
@SparkQA
Copy link

SparkQA commented Feb 22, 2019

Test build #102651 has finished for PR 23859 at commit 1299715.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Feb 27, 2019

Test build #102795 has finished for PR 23859 at commit 1299715.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

cc @rdblue

@rdblue
Copy link
Contributor

rdblue commented Mar 2, 2019

+1

Thanks for working on this, @cloud-fan! It looks good to me.

Copy link
Member

@gatorsmile gatorsmile left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Merged to master.

@gatorsmile gatorsmile closed this in 382d5a8 Mar 4, 2019
mccheah pushed a commit to palantir/spark that referenced this pull request May 15, 2019
## What changes were proposed in this pull request?

Similar to `SaveMode`, we should remove streaming `OutputMode` from data source v2 API, and use operations that has clear semantic.

The changes are:
1. append mode: create `StreamingWrite` directly. By default, the `WriteBuilder` will create `Write` to append data.
2. complete mode: call `SupportsTruncate#truncate`. Complete mode means truncating all the old data and appending new data of the current epoch. `SupportsTruncate` has exactly the same semantic.
3. update mode: fail. The current streaming framework can't propagate the update keys, so v2 sinks are not able to implement update mode. In the future we can introduce a `SupportsUpdate` trait.

The behavior changes:
1. all the v2 sinks(foreach, console, memory, kafka, noop) don't support update mode. The fact is, previously all the v2 sinks implement the update mode wrong. None of them can really support it.
2. kafka sink doesn't support complete mode. The fact is, the kafka sink can only append data.

## How was this patch tested?

existing tests

Closes apache#23859 from cloud-fan/update.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
@krishari2020
Copy link

@cloud-fan Then what happens to sorting operations which can run only in complete mode? By truncating the data accumulated in previous batch does it imply we will apply sort operations only within the data available in the current micro-batch? If my assumption is right aren't we going back to Dstream behaviour of applying window transformation over the batch interval?

@cloud-fan
Copy link
Contributor Author

the "new data" mentioned in this PR means the new result of running the query with the entire input, not a single micro-batch. This is the semantic of complete mode AFAIK.

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Jan 14, 2020

Btw, does the concern based on the real world workload? Because I cannot imagine "complete mode" works with decent amount of traffic, especially you're running the query for long time. "complete mode" means you cannot evict any state regardless of watermark, which won't make sense except you have finite set of group key (if then the cardinality of group keys will define the overall size of state).

If my assumption is right aren't we going back to Dstream behaviour of applying window transformation over the batch interval?

That's why "state" comes into play in structured streaming. The state retains the values across micro-batches, "windows" in case of window transformations.

In fact, as previous comments in this PR stated already, the only mode works without any tweak in production is append mode. In update mode you can tweak with custom sink to make it correctly upsert with the output, but there's no API to define "group keys" in existing sinks.
(Even we have defined the group keys for sink side, it would be odd if we cannot guarantee the keys are same being used to group the data and do the stateful operation. It's semantically incorrect.)

Btw, the streaming output mode is all about how to emit output for the stateful operation. If you don't do any stateful operation, output mode is no-op.

@tdas
Copy link
Contributor

tdas commented May 12, 2020

@cloud-fan Just to be clear on the impact of this change, v2 sources like foreach sink will not be able to use update mode? If so, that means that existing streaming queries that used foreach to write update-mode output (i.e. changed rows in each micro-batch of an aggregation query) to arbitrary sources wont be able to run on Spark 3.0?

In addition, I have the same question for Kafka. If someone could run a streaming query writing Kafka in update mode Spark 2.4, will still work in Spark 3.0 under default settings?

@HeartSaVioR
Copy link
Contributor

I guess this change was originated from #23702 - if I understand correctly, the precondition of this change is that we will bring the API for update (strictly saying, upsert) mode, but it doesn't seem to happen. As of now, this change would discontinue some use cases which the owners of the query want to manually deal with upsert as @marmbrus pointed out in #23702.

That is beyond the concern about breaking existing queries, because there's also another perspective of streaming output mode, the latency of output, the one of core key points in streaming. They would be willing to deal with upsert by themselves (even Spark doesn't support it naturally) if their use case is critical on reflecting the updated information on time. They can explicitly deal with foreach sink (or custom sink which deals with the upsert), or implicitly deal with storage side feature like compact topic.

Even we'd like to tolerate the breaking change to save some time for better design of update API, the change should be guided to the release note, structured streaming migration guide, and also structured streaming programming guide should be modified.

@cloud-fan
Copy link
Contributor Author

If the same query works in 2.4 but not 3.0, it's a breaking change, and we should treat it seriously.

I'm fixing it in #28523

@rdblue
Copy link
Contributor

rdblue commented May 13, 2020

-1 for adding this back. The update mode is ambiguous and doesn't tell the source which rows to replace in the output. When we originally removed it, there were no sources that actually implemented it. What is the source that has a behavior change? Are we sure that it is due to this?

@marmbrus
Copy link
Contributor

marmbrus commented May 13, 2020

I think you mean sinks. Any source can be read from and then processed in update mode.

I know of a lot of production use cases that use the foreach sink in update mode. Similarly I think it is pretty easy to understand how it works with console or even when writing to a compacted topic in kafka, given the user specifies the right key.

I'm also not sure I would say its "ambiguous". For any given query its clear what they key, we just don't have machinery to automatically indicate it to the sink. The lack of that machinery though does not seem like a good reason to break cases that were working just fine before 3.0.

@rdblue
Copy link
Contributor

rdblue commented May 13, 2020

I'm not suggesting we break for 3.0. I just don't think that it's a good idea to compromise the DSv2 API for this reason -- if we don't communicate what gets replaced then there is no way to have reliable behavior. I think @tdas has suggested a good work-around, so let's go with that.

@HeartSaVioR
Copy link
Contributor

(Just 2 cents, we'd better not to use "source" as shortcut of "data source" which gives confusion with "source" in source/sink.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants