-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-26356][SQL] remove SaveMode from data source v2 #24233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
assert(!new File(sessionPath).exists) | ||
checkAnswer(spark.read.format(format).option("path", optionPath).load(), df) | ||
} | ||
} | ||
} | ||
} | ||
|
||
test("SPARK-25700: do not read schema when writing in other modes except append and overwrite") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is not needed anymore, as ErrorIfExists
and Ignore
modes are unsupported now.
Test build #104036 has finished for PR 24233 at commit
|
Test build #104037 has finished for PR 24233 at commit
|
retest this please |
Test build #104040 has finished for PR 24233 at commit
|
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.catalyst.expressions.Attribute | ||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} | ||
import org.apache.spark.sql.sources.{AlwaysTrue, Filter} | ||
import org.apache.spark.sql.sources.v2.SupportsWrite | ||
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsSaveMode, SupportsTruncate, WriteBuilder, WriterCommitMessage} | ||
import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsTruncate, WriteBuilder, WriterCommitMessage} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: import org.apache.spark.sql.sources.v2.writer._
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't agree with this.
Wildcard imports make it difficult to cherry-pick commits and increase conflicts. It is also difficult to see where symbols are coming from and pollutes the namespace with everything in a package instead of just the required names.
For example, I recently hit problems adding a logical
package for expressions because of places that imported expressions._
along with plans._
.
+1 on the proposal. |
private[noop] object NoopWriteBuilder extends WriteBuilder | ||
with SupportsSaveMode with SupportsTruncate { | ||
override def mode(mode: SaveMode): WriteBuilder = this | ||
private[noop] object NoopWriteBuilder extends WriteBuilder with SupportsTruncate { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since now DSV2(except file sources) can only write to an existing table, here the write path of NoopDataSource will still fail (analyzer rule ResolveOutputRelation
)
Test build #104056 has finished for PR 24233 at commit
|
Test build #104061 has finished for PR 24233 at commit
|
Test build #104071 has finished for PR 24233 at commit
|
retest this please. |
Test build #104083 has finished for PR 24233 at commit
|
Retest this please. |
Test build #104132 has finished for PR 24233 at commit
|
retest this please |
Test build #104150 has finished for PR 24233 at commit
|
table.resolved && query.resolved && { | ||
// When the table schema is empty, skip the schema check. | ||
// TODO: introduce a table capability to disable schema check. | ||
table.schema.isEmpty || { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this necessary for removing SaveMode
? It seems unrelated.
I think that we should add the table capability instead of adding this check. What does this unblock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The noop data source reports its schema as empty, but can accept input data set of any schema.
Previously, SupportsSaveMode
was a special case, and we skip schema check for it. Now, only file source is a special case, and schema check is applied to noop data source as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds like this commit depends on adding a capability to turn off schema validation. That commit should go in first to avoid adding code that we have to remember to revert later.
@@ -24,7 +24,7 @@ | |||
|
|||
/** | |||
* The base interface for v2 data sources which don't have a real catalog. Implementations must | |||
* have a public, 0-arg constructor. | |||
* have a public, 0-arg constructor, and can only deal with existing tables. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The phrase "deal with" is not very specific. I think it would be better to say what these tables can be used for: data operations like read, append, delete, and overwrite. Not operations that require metadata changes.
// the save modes, which violates the semantic of `TableProvider`. Here we special-case | ||
// file source and pass the save mode to file source directly. This hack can be removed | ||
// after we figure out a general interface for path-based data sources. | ||
case table: FileTable => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this hack necessary? Why not put off v2 support for path-based tables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we want to migrate file source to v2, to validate the API of Table
, ScanBuilder
, WriteBuilder
, etc.
This hack is just to work around the issue that we do not have a proper entry API for path-based data source, which I believe we will have later on.
I think this is not a bad idea. We can unblock the file source migration, and we can keep the DS v2 clean (FileTable
is an internal class). Besides that this hack will be removed once we have path-based API in DS v2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File sources do not use v2 by default, so this is not a necessary change for this commit. I think it should be in a separate PR. We can discuss whether it is a good idea to add this hack in the next DSv2 sync.
Please remove it from this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm adding this to my agenda for the sync-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that we agreed in the sync-up to remove this hack. Is there a reason why it is still included?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I explained in the comment: #24233 (comment)
Removing the hack breaks several tests and I'd like to do it in another PR. Since SupportsSaveMode
is a hack, which means we already have a hack for file source v2 in DataFrameWriter.save
. To keep the PR small, I think it's better to still keep the hack(it's not creating a new hack) and open a new PR to remove it and update the tests.
@@ -54,7 +56,7 @@ class NoopSuite extends SharedSQLContext { | |||
accum.add(1) | |||
x | |||
} | |||
.write.format("noop").save() | |||
.write.format("noop").mode("append").save() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change the default mode for v2 to append
if no mode is set? That seems more reasonable than always requiring mode("append")
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I support it, but I'd like to do it in another PR, as we expect to have discussions about backward compatibility for changing the default mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, then I think this PR depends on the PR to make append the default mode for v2. Please get that in first.
Otherwise, this PR will make unnecessary changes to several places and we will have to roll those changes back to make append the default mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if we end up with not changing the default mode (e.g. the community doesn't agree with it)? do you think that's a blocker to forbid ErrorIfExists
and Ignore
mode for TableProvider
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I don't think so. We would just have to make changes like this one. My motivation is to avoid making these changes because this touches several files just to set the mode to append.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't change the default like this. It'd be a huge nightmare for users to upgrade, because it's a subtle semantic change. If we want to piss off all Spark users that write data pipelines, changing the default from one to another here is a great way to achieve that.
We can however introduce new APIs to make it more obvious what's happening.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The proposal is only to change the default when using v2. If we don't change the default for v2, then the default is to always fail, which I think is worse.
Test build #104186 has finished for PR 24233 at commit
|
Test build #105498 has finished for PR 24233 at commit
|
* </ul> | ||
* <p> | ||
* When writing to data source v1, the default option is `ErrorIfExist`. When writing to data | ||
* source v2, the default option is `Append`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @rxin objected to changing the default mode for DSv2. Has that changed?
I think that this is a good idea because the most sensible default for v2 is to append if no mode is set, instead of failing because the mode is not supported. This doesn't change v1 behavior, so I think it is okay.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @rxin has a concern about breaking existing ETL pipelines. Now we only change the default mode for DS v2(exclude file sources), I think there is no compatibility issue.
@@ -219,14 +219,14 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { | |||
assert(spark.read.format(cls.getName).option("path", path).load().collect().isEmpty) | |||
|
|||
spark.range(10).select('id as 'i, -'id as 'j).write.format(cls.getName) | |||
.option("path", path).save() | |||
.option("path", path).mode("append").save() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this change is needed because the default mode for v2 is append.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before my PR, we write the files twice: once without the save mode, once with append mode.
Now I switch order, to make sure that the second write doesn't specify save mode, and prove the default mode is append.
Note that, file source v2 is still supported in I think this is out of the scope of this PR, which aims to remove the |
@cloud-fan, I don't recall that conclusion from a sync. Can you quote from the notes that you're talking about? I'm fine fixing this in a follow-up, as long as there's a blocker filed so that this doesn't go into the 3.0 release. |
Let me quote something from your meeting notes:
At that time, I didn't realize that we do have test cases relying on file source v2 writing. That's why I think it's better to do it in another PR and update the tests. I've created a ticket to track it: https://issues.apache.org/jira/browse/SPARK-27815 |
@cloud-fan, I don't think that issue is accurate. The problem is that there is a hack to continue passing SaveMode into some sources. That's what needs to be removed. Please update the issue to be more clear and mark it as a blocker, then I think we can move forward with this PR. |
@cloud-fan, thanks for fixing the issue. I think this is ready to go. +1 from me. |
Hi @dongjoon-hyun , do you have time to take a look at this please? thanks! |
Retest this please. |
@@ -268,9 +269,26 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |||
|
|||
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ | |||
provider.getTable(dsOptions) match { | |||
// TODO: for backward compatibility reasons, the builtin file source needs to support all |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have an IDed TODO? TODO:
-> TODO(SPARK-XXX):
?
@@ -268,9 +269,26 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |||
|
|||
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ | |||
provider.getTable(dsOptions) match { | |||
// TODO: for backward compatibility reasons, the builtin file source needs to support all | |||
// the save modes, which violates the semantic of `TableProvider`. Here we special-case |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. Here we special-case
?
* </ul> | ||
* <p> | ||
* When writing to data source v1, the default option is `ErrorIfExist`. When writing to data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit ErrorIfExist
-> ErrorIfExists
@@ -78,15 +80,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |||
* @since 1.4.0 | |||
*/ | |||
def mode(saveMode: String): DataFrameWriter[T] = { | |||
this.mode = saveMode.toLowerCase(Locale.ROOT) match { | |||
mode(saveMode.toLowerCase(Locale.ROOT) match { | |||
case "overwrite" => SaveMode.Overwrite | |||
case "append" => SaveMode.Append | |||
case "ignore" => SaveMode.Ignore | |||
case "error" | "errorifexists" | "default" => SaveMode.ErrorIfExists |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be changed because "default" is not ErrorIfExists
for DSv2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a test coverage for this? It seems that we don't have yet.
Thank you for pinging me, @cloud-fan . Mostly, it looks good to me. I have only one concern about |
Test build #105749 has finished for PR 24233 at commit
|
@@ -26,6 +26,9 @@ | |||
* The base interface for v2 data sources which don't have a real catalog. Implementations must | |||
* have a public, 0-arg constructor. | |||
* <p> | |||
* Note that, TableProvider can only apply data operations to existing tables, like read, append, | |||
* delete, and overwrite. Not operations that require metadata changes, like create/drop tables. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not operations
-> It does not support the operations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@dongjoon-hyun good catch about |
Test build #105761 has finished for PR 24233 at commit
|
case "append" => mode(SaveMode.Append) | ||
case "ignore" => mode(SaveMode.Ignore) | ||
case "error" | "errorifexists" => mode(SaveMode.ErrorIfExists) | ||
case "default" => this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Thanks! Merged to master. |
.format("org.apache.spark.sql.test") | ||
.mode("default") | ||
.save() | ||
assert(LastOptions.saveMode === SaveMode.ErrorIfExists) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM, too.
In data source v1, save mode specified in `DataFrameWriter` is passed to data source implementation directly, and each data source can define its own behavior about save mode. This is confusing and we want to get rid of save mode in data source v2. For data source v2, we expect data source to implement the `TableCatalog` API, and end-users use SQL(or the new write API described in [this doc](https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5ace0718#heading=h.e9v1af12g5zo)) to acess data sources. The SQL API has very clear semantic and we don't need save mode at all. However, for simple data sources that do not have table management (like a JIRA data source, a noop sink, etc.), it's not ideal to ask them to implement the `TableCatalog` API, and throw exception here and there. `TableProvider` API is created for simple data sources. It can only get tables, without any other table management methods. This means, it can only deal with existing tables. `TableProvider` fits well with `DataStreamReader` and `DataStreamWriter`, as they can only read/write existing tables. However, `TableProvider` doesn't fit `DataFrameWriter` well, as the save mode requires more than just get table. More specifically, `ErrorIfExists` mode needs to check if table exists, and create table. `Ignore` mode needs to check if table exists. When end-users specify `ErrorIfExists` or `Ignore` mode and write data to `TableProvider` via `DataFrameWriter`, Spark fails the query and asks users to use `Append` or `Overwrite` mode. The file source is in the middle of `TableProvider` and `TableCatalog`: it's simple but it can check table(path) exists and create table(path). That said, file source supports all the save modes. Currently file source implements `TableProvider`, and it's not working because `TableProvider` doesn't support `ErrorIfExists` and `Ignore` modes. Ideally we should create a new API for path-based data sources, but to unblock the work of file source v2 migration, this PR proposes to special-case file source v2 in `DataFrameWriter`, to make it work. This PR also removes `SaveMode` from data source v2, as now only the internal file source v2 needs it. existing tests Closes apache#24233 from cloud-fan/file. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
In data source v1, save mode specified in `DataFrameWriter` is passed to data source implementation directly, and each data source can define its own behavior about save mode. This is confusing and we want to get rid of save mode in data source v2. For data source v2, we expect data source to implement the `TableCatalog` API, and end-users use SQL(or the new write API described in [this doc](https://docs.google.com/document/d/1gYm5Ji2Mge3QBdOliFV5gSPTKlX4q1DCBXIkiyMv62A/edit?ts=5ace0718#heading=h.e9v1af12g5zo)) to acess data sources. The SQL API has very clear semantic and we don't need save mode at all. However, for simple data sources that do not have table management (like a JIRA data source, a noop sink, etc.), it's not ideal to ask them to implement the `TableCatalog` API, and throw exception here and there. `TableProvider` API is created for simple data sources. It can only get tables, without any other table management methods. This means, it can only deal with existing tables. `TableProvider` fits well with `DataStreamReader` and `DataStreamWriter`, as they can only read/write existing tables. However, `TableProvider` doesn't fit `DataFrameWriter` well, as the save mode requires more than just get table. More specifically, `ErrorIfExists` mode needs to check if table exists, and create table. `Ignore` mode needs to check if table exists. When end-users specify `ErrorIfExists` or `Ignore` mode and write data to `TableProvider` via `DataFrameWriter`, Spark fails the query and asks users to use `Append` or `Overwrite` mode. The file source is in the middle of `TableProvider` and `TableCatalog`: it's simple but it can check table(path) exists and create table(path). That said, file source supports all the save modes. Currently file source implements `TableProvider`, and it's not working because `TableProvider` doesn't support `ErrorIfExists` and `Ignore` modes. Ideally we should create a new API for path-based data sources, but to unblock the work of file source v2 migration, this PR proposes to special-case file source v2 in `DataFrameWriter`, to make it work. This PR also removes `SaveMode` from data source v2, as now only the internal file source v2 needs it. existing tests Closes apache#24233 from cloud-fan/file. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com>
What changes were proposed in this pull request?
In data source v1, save mode specified in
DataFrameWriter
is passed to data source implementation directly, and each data source can define its own behavior about save mode. This is confusing and we want to get rid of save mode in data source v2.For data source v2, we expect data source to implement the
TableCatalog
API, and end-users use SQL(or the new write API described in this doc) to acess data sources. The SQL API has very clear semantic and we don't need save mode at all.However, for simple data sources that do not have table management (like a JIRA data source, a noop sink, etc.), it's not ideal to ask them to implement the
TableCatalog
API, and throw exception here and there.TableProvider
API is created for simple data sources. It can only get tables, without any other table management methods. This means, it can only deal with existing tables.TableProvider
fits well withDataStreamReader
andDataStreamWriter
, as they can only read/write existing tables. However,TableProvider
doesn't fitDataFrameWriter
well, as the save mode requires more than just get table. More specifically,ErrorIfExists
mode needs to check if table exists, and create table.Ignore
mode needs to check if table exists. When end-users specifyErrorIfExists
orIgnore
mode and write data toTableProvider
viaDataFrameWriter
, Spark fails the query and asks users to useAppend
orOverwrite
mode.The file source is in the middle of
TableProvider
andTableCatalog
: it's simple but it can check table(path) exists and create table(path). That said, file source supports all the save modes.Currently file source implements
TableProvider
, and it's not working becauseTableProvider
doesn't supportErrorIfExists
andIgnore
modes. Ideally we should create a new API for path-based data sources, but to unblock the work of file source v2 migration, this PR proposes to special-case file source v2 inDataFrameWriter
, to make it work.This PR also removes
SaveMode
from data source v2, as now only the internal file source v2 needs it.How was this patch tested?
existing tests