Skip to content

Commit ca08d3b

Browse files
committed
V1_BATCH_WRITE should also pass BATCH_WRITE checks
1 parent c18f849 commit ca08d3b

File tree

1 file changed

+10
-4
lines changed

1 file changed

+10
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TableCapabilityCheck.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.spark.sql.AnalysisException
2121
import org.apache.spark.sql.catalyst.expressions.Literal
2222
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic}
2323
import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2}
24+
import org.apache.spark.sql.sources.v2.{SupportsWrite, Table}
2425
import org.apache.spark.sql.sources.v2.TableCapability._
2526
import org.apache.spark.sql.types.BooleanType
2627

@@ -32,6 +33,11 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) {
3233

3334
private def failAnalysis(msg: String): Unit = throw new AnalysisException(msg)
3435

36+
private def supportsBatchWrite(table: Table): Boolean = table match {
37+
case supportsWrite: SupportsWrite => supportsWrite.supportsAny(BATCH_WRITE, V1_BATCH_WRITE)
38+
case _ => false
39+
}
40+
3541
override def apply(plan: LogicalPlan): Unit = {
3642
plan foreach {
3743
case r: DataSourceV2Relation if !r.table.supports(BATCH_READ) =>
@@ -44,7 +50,7 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) {
4450
// TODO: check STREAMING_WRITE capability. It's not doable now because we don't have a
4551
// a logical plan for streaming write.
4652

47-
case AppendData(r: DataSourceV2Relation, _, _) if !r.table.supports(BATCH_WRITE) =>
53+
case AppendData(r: DataSourceV2Relation, _, _) if !supportsBatchWrite(r.table) =>
4854
failAnalysis(s"Table ${r.table.name()} does not support append in batch mode.")
4955

5056
case OverwritePartitionsDynamic(r: DataSourceV2Relation, _, _)
@@ -54,13 +60,13 @@ object TableCapabilityCheck extends (LogicalPlan => Unit) {
5460
case OverwriteByExpression(r: DataSourceV2Relation, expr, _, _) =>
5561
expr match {
5662
case Literal(true, BooleanType) =>
57-
if (!r.table.supports(BATCH_WRITE) ||
58-
!r.table.supportsAny(TRUNCATE, OVERWRITE_BY_FILTER)) {
63+
if (!supportsBatchWrite(r.table) ||
64+
!r.table.supportsAny(TRUNCATE, OVERWRITE_BY_FILTER)) {
5965
failAnalysis(
6066
s"Table ${r.table.name()} does not support truncate in batch mode.")
6167
}
6268
case _ =>
63-
if (!r.table.supports(BATCH_WRITE) || !r.table.supports(OVERWRITE_BY_FILTER)) {
69+
if (!supportsBatchWrite(r.table) || !r.table.supports(OVERWRITE_BY_FILTER)) {
6470
failAnalysis(s"Table ${r.table.name()} does not support " +
6571
"overwrite by filter in batch mode.")
6672
}

0 commit comments

Comments
 (0)