-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-22026][SQL] data source v2 write path #19269
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #81891 has finished for PR 19269 at commit
|
7b6a6b7
to
862a679
Compare
* tasks. | ||
* 3. Wait until all the writers/partitions are finished, i.e., either committed or aborted. If | ||
* all partitions are written successfully, call {@link #commit(WriterCommitMessage[])}. If | ||
* some partitions failed and aborted, call {@link #abort()}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main reason why I wanted a separate SPIP for the write path was this point in the doc:
Ideally partitioning/bucketing concept should not be exposed in the Data Source API V2, because they are just techniques for data skipping and pre-partitioning. However, these 2 concepts are already widely used in Spark, e.g. DataFrameWriter.partitionBy and DDL syntax like ADD PARTITION. To be consistent, we need to add partitioning/bucketing to Data Source V2, so that the implementations can be able to specify partitioning/bucketing for read/write.
There's a lot in there that's worth thinking about and possibly changing:
- Ideally, the DataSourceV2 API wouldn't support bucketing/partitioning
- The current DataFrameWriter API is what we should continue to support
- Implementations should supply bucketing and partitioning for writes because of 2
Bucketing/partitioning: It comes down to the level at which this API is going to be used. It looks like this API currently ignores bucketing and partitioning (unless my read through was too quick). I think I agree that in the long term that's a good thing, but we need ways for a data source to tell Spark about its requirements for incoming data.
In the current version, it looks like Spark would know how to prepare data for writers outside of this API (rather than including support as suggested by point 3). When writing a partitioned table, Spark would get the partitioning from the table definition in the metastore and automatically sort by partition columns. Is that right?
I'd like to move the data store's requirements behind this API. For example, writing to HBase files directly requires sorting by key first. We don't want to do the sort in the writer because it may duplicate work (and isn't captured in the physical plan), and we also don't want to require Spark to know about the requirements of the HBase data store, or any other specific implementation.
DataFrameWriter API: I'd like to talk about separating the API for table definitions and writes, but not necessarily as part of this work. The SPIP should clearly state whether that's part of the scope for this API, and how the proposed API works for both if that work is going to be done in a future proposal. A good example of that is how bucketing/partitioning will be handled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've sent an email to dev-list, let's discuss there.
Test build #81930 has finished for PR 19269 at commit
|
* string-to-string map. | ||
* @return a reader that implements the actual read logic. | ||
* @param options the options for the returned data source reader, which is an immutable | ||
* case-insensitive string-to-string map. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would make this much easier to review if changes to the read path were taken out and committed in a follow-up to #19136.
* Creates a write task which will be serialized and sent to executors. For each partition of the | ||
* input data(RDD), there will be one write task to write the records. | ||
*/ | ||
WriteTask<Row> createWriteTask(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's confusing to have only one write "task" that is serialized and used everywhere. It is implicitly copied by the serialization into multiple distinct tasks. Is there a better name for it? Maybe call the DataWriter
the WriteTask
and serialize something with a better name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about WriteTaskFactory
and WriteTask
? One benefit of current naming is, it's consistent with the read path.
* The writing procedure is: | ||
* 1. Create a write task by {@link #createWriteTask()}, serialize and send it to all the | ||
* partitions of the input data(RDD). | ||
* 2. For each partition, create a data writer with the write task, and write the data of the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this handle speculative execution? This description makes it sound like attempts are only run serially. I'd like to have an interface that signals support for concurrent tasks, for data sources that act like the direct committer and can't handle speculation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark can't disable speculative task at runtime, so we can't support this feature.
I'll add more comments to say that data source writers have to deal with speculative task correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe Spark should be able to disable speculative tasks at runtime instead of requiring data sources to rollback. I'd prefer that, but I don't think it is something that needs to change now. We can always add an interface if/when it is supported that allows data sources to communicate a lack of support for speculation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That doesn't really help if one of the task fails and gets relaunched.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe you're right. If a source supports rollback for sequential tasks, then that might mean it has to support rollback for concurrent tasks. I was originally thinking of a case like JDBC without transactions. So an insert actually creates the rows and rolling back concurrent tasks would delete rows from the other task. But in that case, inserts are idempotent so it wouldn't matter. I'm not sure if there's a case where you can (or would) implement rolling back, but can't handle concurrency. Lets just leave it until someone has a use case for it.
/** | ||
* Commits this writing job with a list of commit messages. The commit messages are collected from | ||
* all data writers for this writing job and are produced by {@link DataWriter#commit()}. This | ||
* also means all the data are written successfully and all data writers are committed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should state the guarantees when this method is called:
- One and only one attempt for every task has committed successfully
- Messages contains the commit message from every committed task attempt, which is no more than one per task.
- All other attempts have been successfully aborted (is this a guarantee, or just that aborts have been attemtped?)
/** | ||
* Aborts this writing job because some data writers are failed to write the records and aborted. | ||
*/ | ||
void abort(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this accept the commit messages for committed tasks, or will tasks be aborted?
I'm thinking of the case where you're writing to S3. Say a data source writes all attempt files to the final locations, then removes any attempts that are aborted. If the job aborts with some tasks that have already committed, then either this should have the option of cleaning up those files (passed in the commit message) or all of the tasks should be individually aborted. I'd prefer to have this abort clean up successful/committed tasks because the logic may be different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's say we have 400 partitions to write, and first 100 write tasks are successful and committed. If the 101st write task failed, there is no way to re-launch the previous 100 write tasks and abort them, we can only ask DataSourceV2Writer
to do a job-level abort.
So you are right, this method should also take a list of commit messages for already-successful write tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking it more, we can't do this. A spark job may crash without knowing which tasks success and which fail. So the abort
here should be able to handle already committed tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain the situation you're talking about a bit more?
I think Spark should pass everything it can to the abort. I agree that the abort here should be flexible and a best-effort, but there are situations where the commit logic can't know how to roll back everything that tasks did without those commit messages. There may be cases where Spark can't guarantee whether a commit was complete or not, but where it can, it should pass those commit messages.
Say you had 100 successful write tasks of 400, but the executor died before it returned the 100th message (but after committing data) and that executor failure hit the max and Spark cancelled the job. Even though Spark has only 99 commit messages, it should still pass the ones that it can. In the S3 case, these are the only way the driver knows to delete the other 99 files. So the choice is between deleting 99 of 100 committed files, and leaving all 100 taking up space. I think rolling back 99 is much better than 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Knowing what's being committed can provide a bit more information as to what is going on, and could be appreciated for that. The biggest issue I fear is loss of the driver itself, so no abort() call is made. A strategy for cleaning up from that would be good, even though its primarily one of bringing up a new writer and saying "cleanup this mess a previous instance left". Looking the S3 example, my strategy there is/would be: identify all pending commits, abort them, then clean up the dest dir.
@InterfaceStability.Evolving | ||
public interface DataWriter<T> { | ||
|
||
void write(T record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if this throws an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not allowed :)
We should specify all the exception behavior in all the APIs, not just this function.
@cloud-fan can you create a JIRA ticket for this to make sure we follow up on it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that might be the case. :)
But, we can't count on not throwing exceptions when writing so it would be good to also document what happens when a writer does throw an exception here.
import org.apache.spark.util.Utils | ||
|
||
case class WriteToDataSourceV2Command(writer: DataSourceV2Writer, query: LogicalPlan) | ||
extends RunnableCommand { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know similar tasks do the same, but this should not implement RunnableCommand
. I'm not sure the original intent for it, but I think RunnableCommand
should be used for small tasks that are carried out on the driver, like DDL.
Using RunnableCommand
in cases like this where a job needs to run ends up effectively linking a logical plan into a physical plan, which has caused a few messy issues. For example, the problem where the Spark SQL tab doesn't show the entire operation and only shows the outer command without metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RunnableCommand simply means it's both a logical plan and a physical plan.
We should fix the UI issue separately (which on its own is super annoying).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It isn't just the UI though. I've seen cases where InsertIntoHadoopFsRelationCommand is run inside a CreateDatasourceTableAsSelectCommand, and their enforcement of write modes depend on one another. Our internal S3 committer updates partition information and handles partition-level conflicts, but that requires that the table exists before the write (to check what partitions already exist). When we moved table creation in the CTAS command, it broke the insert into command, when these two should be separate.
While it's convenient to have a logical plan and a physical plan together, I think this ends up getting misused. That's why I'm advocating to change how we use RunnableCommand. To fix it, we should introduce a node with a command and logical plan, so we can optimize the entire plan and run the command at the right time.
Clearly, this isn't a blocker for this PR, I just want to mention that I see this pattern causing a lot of problems every time we pull in a new Spark version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've also observed this issue where the explain output of commands behaves differently than from logical plans, and have a repro at https://issues.apache.org/jira/browse/SPARK-22204
@InterfaceStability.Evolving | ||
@Experimental | ||
@InterfaceStability.Unstable | ||
public interface SupportsWriteUnsafeRow extends DataSourceV2Writer { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this? as in, what data source would want to write directly unsaferow out? It's some internal format.
862a679
to
d06ef81
Compare
Test build #82249 has finished for PR 19269 at commit
|
* Note that some data writer may already be committed in this case, implementations should be | ||
* aware of this and clean up the data. | ||
*/ | ||
void abort(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still needs to be passed the WriterCommitMessages for committed tasks. (My previous comment is gone now)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I replied on your previous comment: #19269 (comment)
The problem is that, if a writer commit and another writer abort at the same time, maybe the aborting signal arrive at driver first, and driver side doesn't know there is a writter already committed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I missed that!
People may know that I'm busy with some S3 committers which work with Hadoop MapReduce & Spark, with an import of Ryan's commtter into the Hadoop codebase. Thisa includes changes to s3a to support that and alternative design which relies on a consistent s3, a spark committer (not in the spark codebase itself) to handle it there, plust the tests & documentation of how things actually commit work. For that, the conclusion I've reached is: nobody really knows what's going on, and its a miracle things work at all. FWIW, I think I now have what is the closest thing to documentation of what goes on at the Hadoop FS Committer layer, based on code, tests, stepping through operations in an IDE, and other archaeology. Based on that experience, I think a key deliverable ought to be some specification of what a committer does. I know there are bits in the javadocs like "Implementations should handle this case correctly", but without a definition of "correct" its hard to point at an implementation and say "you've got it wrong". I would propose supplementing the code with
Some general questions related to this:
|
logInfo(s"Data source writer $writer committed.") | ||
} catch { | ||
case cause: Throwable => | ||
writer.abort() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this may raise an exception too...better to use Utils.tryWithSafeFinallyAndFailureCallbacks()
return new JavaSimpleCSVReadTask(Collections.emptyIterator()); | ||
} | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the translation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because IOException
is a checked exception and needs to be declared in method signature, this is just a test so wanna make it as simple as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Spark have a RuntimeIOException
? I typically like to use those so that the IOException can still be caught and handled by code that chooses to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I'd just declare testXYZ throws Throwable
and not worry about what gets raised internally: saves on try/catch translation logic
|
||
@Override | ||
public void abort() { | ||
fileWriter.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you are catching and wrapping exceptions, this should be included in the try/catch clause. Also handle fileWriter==null
|
||
if (Files.exists(Paths.get(path))) { | ||
if (mode == SaveMode.ErrorIfExists) { | ||
throw new RuntimeException("data already exists."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd include the path here.
} | ||
} | ||
|
||
static class JavaSimpleCSVDataWriteFactory implements DataWriteFactory<Row>, DataWriter<Row> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IF these test classes worked with a Hadoop FS, they'd not just be testable against the local file:// FS, you could bring up a miniDFS cluster and see how that worked. Why? Gives a broader test of how well the API worked against HDFS and the various failure modes, but also of how you pass configuration down.
val rdd = children.head.execute() | ||
val messages = new Array[WriterCommitMessage](rdd.partitions.length) | ||
|
||
logInfo(s"Start processing data source writer: $writer") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add #of partitions in the log, helps provide a hint of how long it's going to take. If a job hangs, this'll be the last entry in the log, so it's good to be informative
// write the data and commit this writer. | ||
Utils.tryWithSafeFinallyAndFailureCallbacks(block = { | ||
iter.foreach(dataWriter.write) | ||
dataWriter.commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good to log something here, at least @ debug
One other thing that would be good now and invaluable in future is for the What does this to? Lets the writers provide real statistics about the cost of operations. If you look at the changes to {{FileFormatWriter}} to collect stats, it's just listing the written file after close() and returning file size as its sole metric. We are moving to instrumenting more of the Hadoop output streams with statistics collection, and, once there's an API for getting at the values, would allow the driver to aggregate stats from the writer for the writes and the commits. Examples: bytes written, files written, records written, # of 503/throttle events sent back from S3, # of network failures and retried operations, ...etc. Once the writers start collecting this data, there's motivation for the layers underneath to collect more and publish what they get. As an example, here's the data collected by There's a side issue: what is the proposed mandated re-entrancy policy in the writers? Is expectation that |
Hi @steveloughran , thanks for your comments! We should have a detailed specification of this framework, especially for the behaviors at Spark side. I'll update soon. |
Several things to discuss:
|
+1 for the ability to return statistics: the remote stores have lots of information which committers may return |
d06ef81
to
2d41e44
Compare
I'm not following what you mean here.
Agreed. We should state something about this in the abort job docs: "Commit messages passed to abort are the messages for all commits that succeeded and sent a commit message to the driver. It is possible, though unlikely, for an executor to successfully commit data to a data source, but fail before sending the commit message to the driver." |
Test build #82571 has finished for PR 19269 at commit
|
} | ||
if (mode == SaveMode.Overwrite) { | ||
if (fs.exists(path)) { | ||
fs.delete(path, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @steveloughran , overwrite needs this check, because we need to delete the root dir here. The writers always create file with unique name, so we can't delete old files while overwriting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aah, but delete() doesn't throw any exception if the path doesn't exist, just returns false. And we have the tests to verify this. So the check is moot
Test build #82829 has finished for PR 19269 at commit
|
Test build #82844 has finished for PR 19269 at commit
|
Test build #82872 has finished for PR 19269 at commit
|
* the {@link #write(Object)}, {@link #abort()} is called afterwards and the remaining records will | ||
* not be processed. If all records are successfully written, {@link #commit()} is called. | ||
* | ||
* If this data writer successes(all records are successfully written and {@link #commit()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
successes
-> succeeds
. You might need to check all the other cases.
} | ||
} | ||
|
||
object DataWritingSparkTask extends Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason we need to create a separate object for this function run
? Why not moving it to WriteToDataSourceV2Exec
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is mainly for log. If we inline these codes to WriteToDataSourceV2Exec
and make WriteToDataSourceV2Exec
extends Logging
, then we have to serialize and send WriteToDataSourceV2Exec
to executor side for the logging.
* @param attemptNumber The attempt number of the Spark task that runs the returned writer, which | ||
* is usually 0 if the task is not a retried task or a speculative task. | ||
*/ | ||
DataWriter<T> createWriter(int stageId, int partitionId, int attemptNumber); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure why we have stageId here. I'd make it more generic, e.g. a string for some job id, and then some numeric value (64 bit long) for epoch.
b065003
to
7eeb3b0
Compare
Test build #82902 has finished for PR 19269 at commit
|
Test build #82904 has finished for PR 19269 at commit
|
FYI the latest commit passed the test |
* Implementations can coordinate with driver during {@link #commit()} to make sure only one of | ||
* these data writers can commit successfully. Or implementations can allow all of them to commit | ||
* successfully, and have a way to revert committed data writers without the commit message, because | ||
* Spark only accepts the commit message that arrives first and ignore others. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the test case, could we implement the above logics?
Test build #82910 has finished for PR 19269 at commit
|
case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory() | ||
case _ => new RowToInternalRowDataWriterFactory(writer.createWriterFactory(), query.schema) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to introduce a new function for initialization or setup of the writeTask and call it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataWriterFactory.createWriter
is executed on executor side and can be used as initialization phase.
It sounds like the initialization stages are missing in the current protocol API design. We can do it later. BTW, for the other reviewers, this PR is not to implement a true atomic commit protocol. LGTM |
Thanks! Merged to master. This is just the first commit of the data source v2 write protocol. More PRs are coming to further improve it. |
w.r.t init, I'm thinking it's critical to get the DataframeWriter.extraOptions down the tree. This lets committers be tuned on a query-by-query basis for things like conflict management, rather than decide upfront. I'm going have the Hadoop MR committer factory support a |
@steveloughran The current API already supports it. |
thx; I'll see about passing it all the way down past FileOutputFormat |
A working prototype for data source v2 write path. The writing framework is similar to the reading framework. i.e. `WriteSupport` -> `DataSourceV2Writer` -> `DataWriterFactory` -> `DataWriter`. Similar to the `FileCommitPotocol`, the writing API has job and task level commit/abort to support the transaction. new tests Author: Wenchen Fan <wenchen@databricks.com> Closes apache#19269 from cloud-fan/data-source-v2-write.
### What changes were proposed in this pull request? This is a followup of #19269 In #19269 , there is only a scala implementation of simple writable data source in `DataSourceV2Suite`. This PR adds a java implementation of it. ### Why are the changes needed? To improve test coverage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing testsuites Closes #31560 from kevincmchen/SPARK-34432. Lead-authored-by: kevincmchen <kevincmchen@tencent.com> Co-authored-by: Kevin Pis <68981916+kevincmchen@users.noreply.github.com> Co-authored-by: Kevin Pis <kc4163568@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
A working prototype for data source v2 write path.
The writing framework is similar to the reading framework. i.e.
WriteSupport
->DataSourceV2Writer
->DataWriterFactory
->DataWriter
.Similar to the
FileCommitPotocol
, the writing API has job and task level commit/abort to support the transaction.How was this patch tested?
new tests