Skip to content

[SPARK-22026][SQL] data source v2 write path #19269

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Sep 18, 2017

What changes were proposed in this pull request?

A working prototype for data source v2 write path.

The writing framework is similar to the reading framework. i.e. WriteSupport -> DataSourceV2Writer -> DataWriterFactory -> DataWriter.

Similar to the FileCommitPotocol, the writing API has job and task level commit/abort to support the transaction.

How was this patch tested?

new tests

@SparkQA
Copy link

SparkQA commented Sep 18, 2017

Test build #81891 has finished for PR 19269 at commit 7b6a6b7.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class WriteToDataSourceV2Command(writer: DataSourceV2Writer, query: LogicalPlan)
  • class RowToUnsafeRowWriteTask(rowWriteTask: WriteTask[Row], schema: StructType)
  • class RowToUnsafeRowDataWriter(rowWriter: DataWriter[Row], encoder: ExpressionEncoder[Row])

* tasks.
* 3. Wait until all the writers/partitions are finished, i.e., either committed or aborted. If
* all partitions are written successfully, call {@link #commit(WriterCommitMessage[])}. If
* some partitions failed and aborted, call {@link #abort()}.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @rdblue @rxin do we really need an individual SPIP for the write path? I think this procedure is the only thing we need some high-level discussion, other parts are very similar to the read path, e.g. WriteSupport -> DataSourceV2Writer -> WriteTask -> DataWriter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason why I wanted a separate SPIP for the write path was this point in the doc:

Ideally partitioning/bucketing concept should not be exposed in the Data Source API V2, because they are just techniques for data skipping and pre-partitioning. However, these 2 concepts are already widely used in Spark, e.g. DataFrameWriter.partitionBy and DDL syntax like ADD PARTITION. To be consistent, we need to add partitioning/bucketing to Data Source V2, so that the implementations can be able to specify partitioning/bucketing for read/write.

There's a lot in there that's worth thinking about and possibly changing:

  1. Ideally, the DataSourceV2 API wouldn't support bucketing/partitioning
  2. The current DataFrameWriter API is what we should continue to support
  3. Implementations should supply bucketing and partitioning for writes because of 2

Bucketing/partitioning: It comes down to the level at which this API is going to be used. It looks like this API currently ignores bucketing and partitioning (unless my read through was too quick). I think I agree that in the long term that's a good thing, but we need ways for a data source to tell Spark about its requirements for incoming data.

In the current version, it looks like Spark would know how to prepare data for writers outside of this API (rather than including support as suggested by point 3). When writing a partitioned table, Spark would get the partitioning from the table definition in the metastore and automatically sort by partition columns. Is that right?

I'd like to move the data store's requirements behind this API. For example, writing to HBase files directly requires sorting by key first. We don't want to do the sort in the writer because it may duplicate work (and isn't captured in the physical plan), and we also don't want to require Spark to know about the requirements of the HBase data store, or any other specific implementation.

DataFrameWriter API: I'd like to talk about separating the API for table definitions and writes, but not necessarily as part of this work. The SPIP should clearly state whether that's part of the scope for this API, and how the proposed API works for both if that work is going to be done in a future proposal. A good example of that is how bucketing/partitioning will be handled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've sent an email to dev-list, let's discuss there.

@SparkQA
Copy link

SparkQA commented Sep 19, 2017

Test build #81930 has finished for PR 19269 at commit 862a679.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class WriteToDataSourceV2Command(writer: DataSourceV2Writer, query: LogicalPlan)
  • class RowToUnsafeRowWriteTask(rowWriteTask: WriteTask[Row], schema: StructType)
  • class RowToUnsafeRowDataWriter(rowWriter: DataWriter[Row], encoder: ExpressionEncoder[Row])

* string-to-string map.
* @return a reader that implements the actual read logic.
* @param options the options for the returned data source reader, which is an immutable
* case-insensitive string-to-string map.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would make this much easier to review if changes to the read path were taken out and committed in a follow-up to #19136.

* Creates a write task which will be serialized and sent to executors. For each partition of the
* input data(RDD), there will be one write task to write the records.
*/
WriteTask<Row> createWriteTask();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's confusing to have only one write "task" that is serialized and used everywhere. It is implicitly copied by the serialization into multiple distinct tasks. Is there a better name for it? Maybe call the DataWriter the WriteTask and serialize something with a better name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about WriteTaskFactory and WriteTask? One benefit of current naming is, it's consistent with the read path.

* The writing procedure is:
* 1. Create a write task by {@link #createWriteTask()}, serialize and send it to all the
* partitions of the input data(RDD).
* 2. For each partition, create a data writer with the write task, and write the data of the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this handle speculative execution? This description makes it sound like attempts are only run serially. I'd like to have an interface that signals support for concurrent tasks, for data sources that act like the direct committer and can't handle speculation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark can't disable speculative task at runtime, so we can't support this feature.

I'll add more comments to say that data source writers have to deal with speculative task correctly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe Spark should be able to disable speculative tasks at runtime instead of requiring data sources to rollback. I'd prefer that, but I don't think it is something that needs to change now. We can always add an interface if/when it is supported that allows data sources to communicate a lack of support for speculation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That doesn't really help if one of the task fails and gets relaunched.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you're right. If a source supports rollback for sequential tasks, then that might mean it has to support rollback for concurrent tasks. I was originally thinking of a case like JDBC without transactions. So an insert actually creates the rows and rolling back concurrent tasks would delete rows from the other task. But in that case, inserts are idempotent so it wouldn't matter. I'm not sure if there's a case where you can (or would) implement rolling back, but can't handle concurrency. Lets just leave it until someone has a use case for it.

/**
* Commits this writing job with a list of commit messages. The commit messages are collected from
* all data writers for this writing job and are produced by {@link DataWriter#commit()}. This
* also means all the data are written successfully and all data writers are committed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should state the guarantees when this method is called:

  • One and only one attempt for every task has committed successfully
  • Messages contains the commit message from every committed task attempt, which is no more than one per task.
  • All other attempts have been successfully aborted (is this a guarantee, or just that aborts have been attemtped?)

/**
* Aborts this writing job because some data writers are failed to write the records and aborted.
*/
void abort();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this accept the commit messages for committed tasks, or will tasks be aborted?

I'm thinking of the case where you're writing to S3. Say a data source writes all attempt files to the final locations, then removes any attempts that are aborted. If the job aborts with some tasks that have already committed, then either this should have the option of cleaning up those files (passed in the commit message) or all of the tasks should be individually aborted. I'd prefer to have this abort clean up successful/committed tasks because the logic may be different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say we have 400 partitions to write, and first 100 write tasks are successful and committed. If the 101st write task failed, there is no way to re-launch the previous 100 write tasks and abort them, we can only ask DataSourceV2Writer to do a job-level abort.

So you are right, this method should also take a list of commit messages for already-successful write tasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking it more, we can't do this. A spark job may crash without knowing which tasks success and which fail. So the abort here should be able to handle already committed tasks.

Copy link
Contributor

@rdblue rdblue Sep 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the situation you're talking about a bit more?

I think Spark should pass everything it can to the abort. I agree that the abort here should be flexible and a best-effort, but there are situations where the commit logic can't know how to roll back everything that tasks did without those commit messages. There may be cases where Spark can't guarantee whether a commit was complete or not, but where it can, it should pass those commit messages.

Say you had 100 successful write tasks of 400, but the executor died before it returned the 100th message (but after committing data) and that executor failure hit the max and Spark cancelled the job. Even though Spark has only 99 commit messages, it should still pass the ones that it can. In the S3 case, these are the only way the driver knows to delete the other 99 files. So the choice is between deleting 99 of 100 committed files, and leaving all 100 taking up space. I think rolling back 99 is much better than 0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Knowing what's being committed can provide a bit more information as to what is going on, and could be appreciated for that. The biggest issue I fear is loss of the driver itself, so no abort() call is made. A strategy for cleaning up from that would be good, even though its primarily one of bringing up a new writer and saying "cleanup this mess a previous instance left". Looking the S3 example, my strategy there is/would be: identify all pending commits, abort them, then clean up the dest dir.

@InterfaceStability.Evolving
public interface DataWriter<T> {

void write(T record);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this throws an exception?

Copy link
Contributor

@rxin rxin Sep 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not allowed :)

We should specify all the exception behavior in all the APIs, not just this function.

@cloud-fan can you create a JIRA ticket for this to make sure we follow up on it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that might be the case. :)

But, we can't count on not throwing exceptions when writing so it would be good to also document what happens when a writer does throw an exception here.

import org.apache.spark.util.Utils

case class WriteToDataSourceV2Command(writer: DataSourceV2Writer, query: LogicalPlan)
extends RunnableCommand {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know similar tasks do the same, but this should not implement RunnableCommand. I'm not sure the original intent for it, but I think RunnableCommand should be used for small tasks that are carried out on the driver, like DDL.

Using RunnableCommand in cases like this where a job needs to run ends up effectively linking a logical plan into a physical plan, which has caused a few messy issues. For example, the problem where the Spark SQL tab doesn't show the entire operation and only shows the outer command without metrics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RunnableCommand simply means it's both a logical plan and a physical plan.

We should fix the UI issue separately (which on its own is super annoying).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't just the UI though. I've seen cases where InsertIntoHadoopFsRelationCommand is run inside a CreateDatasourceTableAsSelectCommand, and their enforcement of write modes depend on one another. Our internal S3 committer updates partition information and handles partition-level conflicts, but that requires that the table exists before the write (to check what partitions already exist). When we moved table creation in the CTAS command, it broke the insert into command, when these two should be separate.

While it's convenient to have a logical plan and a physical plan together, I think this ends up getting misused. That's why I'm advocating to change how we use RunnableCommand. To fix it, we should introduce a node with a command and logical plan, so we can optimize the entire plan and run the command at the right time.

Clearly, this isn't a blocker for this PR, I just want to mention that I see this pattern causing a lot of problems every time we pull in a new Spark version.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've also observed this issue where the explain output of commands behaves differently than from logical plans, and have a repro at https://issues.apache.org/jira/browse/SPARK-22204

@InterfaceStability.Evolving
@Experimental
@InterfaceStability.Unstable
public interface SupportsWriteUnsafeRow extends DataSourceV2Writer {
Copy link
Contributor

@rxin rxin Sep 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this? as in, what data source would want to write directly unsaferow out? It's some internal format.

@SparkQA
Copy link

SparkQA commented Sep 27, 2017

Test build #82249 has finished for PR 19269 at commit d06ef81.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class WriteToDataSourceV2Command(writer: DataSourceV2Writer, query: LogicalPlan)
  • class RowToInternalRowDataWriteFactory(rowWriterFactory: DataWriteFactory[Row], schema: StructType)
  • class RowToInternalRowDataWriter(rowWriter: DataWriter[Row], encoder: ExpressionEncoder[Row])

* Note that some data writer may already be committed in this case, implementations should be
* aware of this and clean up the data.
*/
void abort();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs to be passed the WriterCommitMessages for committed tasks. (My previous comment is gone now)

Copy link
Contributor Author

@cloud-fan cloud-fan Sep 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replied on your previous comment: #19269 (comment)

The problem is that, if a writer commit and another writer abort at the same time, maybe the aborting signal arrive at driver first, and driver side doesn't know there is a writter already committed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed that!

@steveloughran
Copy link
Contributor

People may know that I'm busy with some S3 committers which work with Hadoop MapReduce & Spark, with an import of Ryan's commtter into the Hadoop codebase. Thisa includes changes to s3a to support that and alternative design which relies on a consistent s3, a spark committer (not in the spark codebase itself) to handle it there, plust the tests & documentation of how things actually commit work. For that, the conclusion I've reached is: nobody really knows what's going on, and its a miracle things work at all.

FWIW, I think I now have what is the closest thing to documentation of what goes on at the Hadoop FS Committer layer, based on code, tests, stepping through operations in an IDE, and other archaeology.

Based on that experience, I think a key deliverable ought to be some specification of what a committer does. I know there are bits in the javadocs like "Implementations should handle this case correctly", but without a definition of "correct" its hard to point at an implementation and say "you've got it wrong".

I would propose supplementing the code with

  1. A rigorous specification, including possible workflows. scala & RFC2119 keywords, maybe.
  2. Tests against that, including attempts to simulate the failure modes, all the orders of execution which aren't expected to be valid, etc.

Some general questions related to this:

  • If a writer doesn't support speculation, can it say so? I know speculation and failure recovery are related, but task retry after failure is linearized, whereas speculation can happen in parallel.
  • Is it possible to instantiate a second writer and say "abort the output of the other writer" (on a different host?). This allows for cleanup of a task's work after the failure of the entire executor. If it's not possible, then the job commit must be required to clean up. Maybe: pass to the job commit information about failing tasks, so it has more of an idea what to do. (Hadoop MapReduce example: AM calls abortTask() for all failing containers before instantiating a new one and retrying)
  • MAY the intermediate output of a task be observable to others?
  • MAY the committed output of a task observable to others? If so, what does this mean for readers? Is it something which a write may wish to declare/warn callers?
  • What if DataWriter.commit() just doesn't return/the executor fails during that commit process? Is that a failure of the entire job vs task? (FWIW MR algorithm 1 handles this, algorithm 2 doesn't).
  • What if writer.abort() raises an exception ? (not unusual if cause of commit failure is network/auth problem)
  • What if writer.abort() is called before any other operation on that writer? Better be a no-op.
  • What if DataSourceV2Writer.commit() fails? Can it be retried? (Easiest to say no, obviously).
  • If, after a DataSourceV2Writer.commit() fails, can DataSourceV2Writer.abort() be called?

logInfo(s"Data source writer $writer committed.")
} catch {
case cause: Throwable =>
writer.abort()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this may raise an exception too...better to use Utils.tryWithSafeFinallyAndFailureCallbacks()

return new JavaSimpleCSVReadTask(Collections.emptyIterator());
}
} catch (IOException e) {
throw new RuntimeException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the translation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because IOException is a checked exception and needs to be declared in method signature, this is just a test so wanna make it as simple as possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Spark have a RuntimeIOException? I typically like to use those so that the IOException can still be caught and handled by code that chooses to.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I'd just declare testXYZ throws Throwable and not worry about what gets raised internally: saves on try/catch translation logic


@Override
public void abort() {
fileWriter.close();
Copy link
Contributor

@steveloughran steveloughran Sep 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you are catching and wrapping exceptions, this should be included in the try/catch clause. Also handle fileWriter==null


if (Files.exists(Paths.get(path))) {
if (mode == SaveMode.ErrorIfExists) {
throw new RuntimeException("data already exists.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd include the path here.

}
}

static class JavaSimpleCSVDataWriteFactory implements DataWriteFactory<Row>, DataWriter<Row> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IF these test classes worked with a Hadoop FS, they'd not just be testable against the local file:// FS, you could bring up a miniDFS cluster and see how that worked. Why? Gives a broader test of how well the API worked against HDFS and the various failure modes, but also of how you pass configuration down.

val rdd = children.head.execute()
val messages = new Array[WriterCommitMessage](rdd.partitions.length)

logInfo(s"Start processing data source writer: $writer")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add #of partitions in the log, helps provide a hint of how long it's going to take. If a job hangs, this'll be the last entry in the log, so it's good to be informative

// write the data and commit this writer.
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
iter.foreach(dataWriter.write)
dataWriter.commit()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good to log something here, at least @ debug

@steveloughran
Copy link
Contributor

One other thing that would be good now and invaluable in future is for the DataWriter.commit() call to return a Map[String,Long] of statistics alongside the message sent to the committer. The spec should say these statistics "MUST be specific to the writer/its thread", so that aggregating the stats across all tasks produces valid output.

What does this to? Lets the writers provide real statistics about the cost of operations. If you look at the changes to {{FileFormatWriter}} to collect stats, it's just listing the written file after close() and returning file size as its sole metric. We are moving to instrumenting more of the Hadoop output streams with statistics collection, and, once there's an API for getting at the values, would allow the driver to aggregate stats from the writer for the writes and the commits. Examples: bytes written, files written, records written, # of 503/throttle events sent back from S3, # of network failures and retried operations, ...etc. Once the writers start collecting this data, there's motivation for the layers underneath to collect more and publish what they get. As an example, here's the data collected by S3AOutputStream, exposed via OutputStream.toString() as well as fed to hadoop metrics. As well as bytes written, it tracks blocks PUT, retries on completion operations, and how many times a block PUT failed and had to be retried. That means that a job can have results like "it took X seconds and wrote Y bytes but it had to repost Z bytes of data, which made things slow"

There's a side issue: what is the proposed mandated re-entrancy policy in the writers?

Is expectation that DataWriter.write() will always be called by a single thread, and therefore no need to implement thread safe writes to the output stream, or is there a risk that >1 thread may write sequentially (preventing thread local storage for collecting stats) or even simultaneously. (if so, the example is in trouble as the java.io APIs say no need to support re-entrancy, even if HDFS does. Again, this is the kind of thing where some specification can highlight the policy, otherwise people will code against the implementation, which is precisely why HDFS DFSOutputStreams are stuck doing thread-safety writes (HBase, see).

@cloud-fan
Copy link
Contributor Author

Hi @steveloughran , thanks for your comments! We should have a detailed specification of this framework, especially for the behaviors at Spark side. I'll update soon.

@cloud-fan
Copy link
Contributor Author

cloud-fan commented Oct 9, 2017

Several things to discuss:

  1. Since Spark can't disable speculation during runtime, currently there is not much benefit to provide an interface for data source to disable speculation, because data source can check the spark conf at the beginning and throw exception if speculation is enabled. We can do it later via mix-in trait.
  2. The only contract Spark needs is: data written/committed by tasks should not be visible to data source readers until the job-level commitment. But they can be visible to others like other writing tasks, so it's possible for data sources to implement "abort the output of the other writer".
  3. The WriteCommitMessage can include statistics(it's an empty interface), so data sources can aggregate statistics at driver side.
  4. making DataSourceV2Writer.abort take commit messages is still a "best-effort" to clean up the data, there are still some cases we can't get the commit messages, e.g. task failed to abort, a speculative task committed but the commit message doesn't reach driver side, etc. I think maybe it's better to ask data source to put task output in a central place so that we can clean up all the task data without the commit message.

cc @steveloughran @rdblue

@steveloughran
Copy link
Contributor

+1 for the ability to return statistics: the remote stores have lots of information which committers may return

@cloud-fan cloud-fan force-pushed the data-source-v2-write branch from d06ef81 to 2d41e44 Compare October 9, 2017 22:15
@rdblue
Copy link
Contributor

rdblue commented Oct 9, 2017

The only contract Spark needs is: data written/committed by tasks should not be visible to data source readers until the job-level commitment. But they can be visible to others like other writing tasks, so it's possible for data sources to implement "abort the output of the other writer".

I'm not following what you mean here.

making DataSourceV2Writer.abort take commit messages is still a "best-effort" to clean up the data

Agreed. We should state something about this in the abort job docs: "Commit messages passed to abort are the messages for all commits that succeeded and sent a commit message to the driver. It is possible, though unlikely, for an executor to successfully commit data to a data source, but fail before sending the commit message to the driver."

@SparkQA
Copy link

SparkQA commented Oct 9, 2017

Test build #82571 has finished for PR 19269 at commit 2d41e44.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class WriteToDataSourceV2Command(writer: DataSourceV2Writer, query: LogicalPlan)
  • class RowToInternalRowDataWriteFactory(rowWriterFactory: DataWriteFactory[Row], schema: StructType)
  • class RowToInternalRowDataWriter(rowWriter: DataWriter[Row], encoder: ExpressionEncoder[Row])

}
if (mode == SaveMode.Overwrite) {
if (fs.exists(path)) {
fs.delete(path, true)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @steveloughran , overwrite needs this check, because we need to delete the root dir here. The writers always create file with unique name, so we can't delete old files while overwriting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aah, but delete() doesn't throw any exception if the path doesn't exist, just returns false. And we have the tests to verify this. So the check is moot

@SparkQA
Copy link

SparkQA commented Oct 17, 2017

Test build #82829 has finished for PR 19269 at commit 416da98.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 17, 2017

Test build #82844 has finished for PR 19269 at commit acf00ec.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 18, 2017

Test build #82872 has finished for PR 19269 at commit 9e12d9f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* the {@link #write(Object)}, {@link #abort()} is called afterwards and the remaining records will
* not be processed. If all records are successfully written, {@link #commit()} is called.
*
* If this data writer successes(all records are successfully written and {@link #commit()}
Copy link
Member

@gatorsmile gatorsmile Oct 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

successes -> succeeds. You might need to check all the other cases.

}
}

object DataWritingSparkTask extends Logging {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason we need to create a separate object for this function run? Why not moving it to WriteToDataSourceV2Exec ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mainly for log. If we inline these codes to WriteToDataSourceV2Exec and make WriteToDataSourceV2Exec extends Logging, then we have to serialize and send WriteToDataSourceV2Exec to executor side for the logging.

* @param attemptNumber The attempt number of the Spark task that runs the returned writer, which
* is usually 0 if the task is not a retried task or a speculative task.
*/
DataWriter<T> createWriter(int stageId, int partitionId, int attemptNumber);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why we have stageId here. I'd make it more generic, e.g. a string for some job id, and then some numeric value (64 bit long) for epoch.

@cloud-fan cloud-fan force-pushed the data-source-v2-write branch from b065003 to 7eeb3b0 Compare October 19, 2017 05:52
@SparkQA
Copy link

SparkQA commented Oct 19, 2017

Test build #82902 has finished for PR 19269 at commit b065003.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 19, 2017

Test build #82904 has finished for PR 19269 at commit 7eeb3b0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

FYI the latest commit passed the test

* Implementations can coordinate with driver during {@link #commit()} to make sure only one of
* these data writers can commit successfully. Or implementations can allow all of them to commit
* successfully, and have a way to revert committed data writers without the commit message, because
* Spark only accepts the commit message that arrives first and ignore others.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the test case, could we implement the above logics?

@SparkQA
Copy link

SparkQA commented Oct 19, 2017

Test build #82910 has finished for PR 19269 at commit 7eeb3b0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory()
case _ => new RowToInternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
}

Copy link
Member

@gatorsmile gatorsmile Oct 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to introduce a new function for initialization or setup of the writeTask and call it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataWriterFactory.createWriter is executed on executor side and can be used as initialization phase.

@gatorsmile
Copy link
Member

gatorsmile commented Oct 20, 2017

It sounds like the initialization stages are missing in the current protocol API design. We can do it later.

BTW, for the other reviewers, this PR is not to implement a true atomic commit protocol.

LGTM

@gatorsmile
Copy link
Member

Thanks! Merged to master.

This is just the first commit of the data source v2 write protocol. More PRs are coming to further improve it.

@asfgit asfgit closed this in b034f25 Oct 20, 2017
@steveloughran
Copy link
Contributor

w.r.t init, I'm thinking it's critical to get the DataframeWriter.extraOptions down the tree. This lets committers be tuned on a query-by-query basis for things like conflict management, rather than decide upfront. I'm going have the Hadoop MR committer factory support a Map[String, String] for the options, with the expectation that options hwere will overrride any in the job conf.

@cloud-fan
Copy link
Contributor Author

@steveloughran The current API already supports it. WriteSupport.createWriter takes the option parameter, and DataSourceV2Writer can propagate this option down to the data writer, via createWriterFactory and createWriter.

@steveloughran
Copy link
Contributor

thx; I'll see about passing it all the way down past FileOutputFormat

jzhuge pushed a commit to jzhuge/spark that referenced this pull request Aug 20, 2018
A working prototype for data source v2 write path.

The writing framework is similar to the reading framework. i.e. `WriteSupport` -> `DataSourceV2Writer` -> `DataWriterFactory` -> `DataWriter`.

Similar to the `FileCommitPotocol`, the writing API has job and task level commit/abort to support the transaction.

new tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#19269 from cloud-fan/data-source-v2-write.
cloud-fan pushed a commit that referenced this pull request Feb 22, 2021
### What changes were proposed in this pull request?

This is a followup of #19269

In #19269 , there is only a scala implementation of simple writable data source in `DataSourceV2Suite`.

This PR adds a java implementation of it.

### Why are the changes needed?

To improve test coverage.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

existing testsuites

Closes #31560 from kevincmchen/SPARK-34432.

Lead-authored-by: kevincmchen <kevincmchen@tencent.com>
Co-authored-by: Kevin Pis <68981916+kevincmchen@users.noreply.github.com>
Co-authored-by: Kevin Pis <kc4163568@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants