Skip to content

[SPARK-29331][SQL] create DS v2 Write at physical plan #26001

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

create DS v2 write at physical plan instead of logical plan, for streaming write.

Why are the changes needed?

We may need some physical information when creating DS v2 write, e.g. #25990 . This also matches batch write.

Does this PR introduce any user-facing change?

no

How was this patch tested?

existing tests

@cloud-fan
Copy link
Contributor Author

cc @brkyvz @jose-torres @edrevo

@SparkQA
Copy link

SparkQA commented Oct 2, 2019

Test build #111692 has finished for PR 26001 at commit e112af9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 3, 2019

Test build #111708 has finished for PR 26001 at commit 79f94f5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@edrevo edrevo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sweet! Thanks for the change!

@@ -50,7 +50,7 @@ private[kafka010] object KafkaWriter extends Logging {
topic: Option[String] = None): Unit = {
schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse(
if (topic.isEmpty) {
throw new AnalysisException(s"topic option required when no " +
throw new IllegalArgumentException(s"topic option required when no " +
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we check the options at the physical plan phase, so this should not be AnalysisException anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is unacceptable, then we may need to have analysis time write info and runtime write info. Table.newWriteBuilder takes analysis time write info and WriteBuilder.build takes runtime write info.

I'm not sure if it's worth this complexity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be SparkException? I think the last time we discussed these, it wasn't clear what type of exception to use after analysis. Maybe we need new exception types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use SparkException as well. IllegalArgumentException is a standard java exception which indicates invalid input, I think it's OK to use it even after analysis.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we typically want to always raise SparkException because all exception types inherit from it. Unless we are throwing an exception from a method where there is an illegal argument, but that's not what is happening here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we typically want to always raise SparkException because all exception types inherit from it.

In Spark SQL no exceptions inherit from it. In fact SparkException was rarely used in Spark SQL before we adding the v2 commands. SparkException is defined in spark core and usually used when Spark fails to run a task.

In Spark SQL, AnalysisException and standard Java exceptions are more widely used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I thought that AnalysisException inherited from SparkException. Looks like I was wrong.

@cloud-fan
Copy link
Contributor Author

also cc @rdblue

@edrevo
Copy link
Contributor

edrevo commented Oct 13, 2019

Anything I can do to help push this PR forward? I'd love to get this in so I can finish the PR to add the number of partitions to DSv2

@edrevo
Copy link
Contributor

edrevo commented Oct 23, 2019

all the checks have passed in this PR. could it be merged?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants