Skip to content

[SPARK-27576][SQL] table capability to skip the output column resolution #24469

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,10 @@ public enum TableCapability {
* <p>
* See {@code org.apache.spark.sql.sources.v2.writer.SupportsDynamicOverwrite}.
*/
OVERWRITE_DYNAMIC
OVERWRITE_DYNAMIC,

/**
* Signals that the table accepts input of any schema in a write operation.
*/
ACCEPT_ANY_SCHEMA
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

trait NamedRelation extends LogicalPlan {
def name: String

// When false, the schema of input data must match the schema of this relation, during write.
def skipSchemaResolution: Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -393,14 +393,17 @@ trait V2WriteCommand extends Command {
override lazy val resolved: Boolean = outputResolved

def outputResolved: Boolean = {
table.resolved && query.resolved && query.output.size == table.output.size &&
// If the table doesn't require schema match, we don't need to resolve the output columns.
table.skipSchemaResolution || {
table.resolved && query.resolved && query.output.size == table.output.size &&
query.output.zip(table.output).forall {
case (inAttr, outAttr) =>
// names and types must match, nullability must be compatible
inAttr.name == outAttr.name &&
DataType.equalsIgnoreCompatibleNullability(outAttr.dataType, inAttr.dataType) &&
(outAttr.nullable || !inAttr.nullable)
DataType.equalsIgnoreCompatibleNullability(outAttr.dataType, inAttr.dataType) &&
(outAttr.nullable || !inAttr.nullable)
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.sql.catalyst.analysis
import java.util.Locale

import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Expression, LessThanOrEqual, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LeafNode, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Project}
import org.apache.spark.sql.types.{DoubleType, FloatType, StructField, StructType}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._

class V2AppendDataAnalysisSuite extends DataSourceV2AnalysisSuite {
override def byName(table: NamedRelation, query: LogicalPlan): LogicalPlan = {
Expand Down Expand Up @@ -104,6 +104,12 @@ case class TestRelation(output: Seq[AttributeReference]) extends LeafNode with N
override def name: String = "table-name"
}

case class TestRelationAcceptAnySchema(output: Seq[AttributeReference])
extends LeafNode with NamedRelation {
override def name: String = "test-name"
override def skipSchemaResolution: Boolean = true
}

abstract class DataSourceV2AnalysisSuite extends AnalysisTest {
val table = TestRelation(StructType(Seq(
StructField("x", FloatType),
Expand Down Expand Up @@ -446,6 +452,27 @@ abstract class DataSourceV2AnalysisSuite extends AnalysisTest {
"Cannot safely cast", "'x'", "DoubleType to FloatType"))
}

test("bypass output column resolution") {
val table = TestRelationAcceptAnySchema(StructType(Seq(
StructField("a", FloatType, nullable = false),
StructField("b", DoubleType))).toAttributes)

val query = TestRelation(StructType(Seq(
StructField("s", StringType))).toAttributes)

withClue("byName") {
val parsedPlan = byName(table, query)
assertResolved(parsedPlan)
checkAnalysis(parsedPlan, parsedPlan)
}

withClue("byPosition") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue if 2 tests are very similar, it's recommended to use withClue and merge the 2 tests into one. For example, when the byPosition case fails, we will see

bypass output column resolution *** FAILED *** (36 milliseconds)
[info]   byPosition (DataSourceV2AnalysisSuite.scala:473)
[info]   org.scalatest.exceptions.TestFailedException:
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll send a followup PR to update the entire test suite to use withClue later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, sounds good.

val parsedPlan = byPosition(table, query)
assertResolved(parsedPlan)
checkAnalysis(parsedPlan, parsedPlan)
}
}

def assertNotResolved(logicalPlan: LogicalPlan): Unit = {
assert(!logicalPlan.resolved, s"Plan should not be resolved: $logicalPlan")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ private[noop] object NoopTable extends Table with SupportsWrite {
override def name(): String = "noop-table"
override def schema(): StructType = new StructType()
override def capabilities(): util.Set[TableCapability] = {
Set(TableCapability.BATCH_WRITE, TableCapability.STREAMING_WRITE).asJava
Set(
TableCapability.BATCH_WRITE,
TableCapability.STREAMING_WRITE,
TableCapability.ACCEPT_ANY_SCHEMA).asJava
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ case class DataSourceV2Relation(

override def name: String = table.name()

override def skipSchemaResolution: Boolean = table.supports(TableCapability.ACCEPT_ANY_SCHEMA)

override def simpleString(maxFields: Int): String = {
s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name"
}
Expand Down