Skip to content

Commit 7e08697

Browse files
committed
[SPARK-53720][SQL] Simplify extracting Table from DataSourceV2Relation
1 parent 75267bc commit 7e08697

File tree

12 files changed

+31
-29
lines changed

12 files changed

+31
-29
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.util.RowDeltaUtils._
2424
import org.apache.spark.sql.connector.catalog.{SupportsDeleteV2, SupportsRowLevelOperations, TruncatableTable}
2525
import org.apache.spark.sql.connector.write.{RowLevelOperationTable, SupportsDelta}
2626
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE
27-
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
27+
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Table}
2828
import org.apache.spark.sql.util.CaseInsensitiveStringMap
2929

3030
/**
@@ -40,11 +40,11 @@ object RewriteDeleteFromTable extends RewriteRowLevelCommand {
4040
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
4141
case d @ DeleteFromTable(aliasedTable, cond) if d.resolved =>
4242
EliminateSubqueryAliases(aliasedTable) match {
43-
case DataSourceV2Relation(_: TruncatableTable, _, _, _, _) if cond == TrueLiteral =>
43+
case DataSourceV2Table(_: TruncatableTable) if cond == TrueLiteral =>
4444
// don't rewrite as the table supports truncation
4545
d
4646

47-
case r @ DataSourceV2Relation(t: SupportsRowLevelOperations, _, _, _, _) =>
47+
case r @ DataSourceV2Table(t: SupportsRowLevelOperations) =>
4848
val table = buildOperationTable(t, DELETE, CaseInsensitiveStringMap.empty())
4949
table.operation match {
5050
case _: SupportsDelta =>
@@ -53,7 +53,7 @@ object RewriteDeleteFromTable extends RewriteRowLevelCommand {
5353
buildReplaceDataPlan(r, table, cond)
5454
}
5555

56-
case DataSourceV2Relation(_: SupportsDeleteV2, _, _, _, _) =>
56+
case DataSourceV2Table(_: SupportsDeleteV2) =>
5757
// don't rewrite as the table supports deletes only with filters
5858
d
5959

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
2929
import org.apache.spark.sql.connector.write.{RowLevelOperationTable, SupportsDelta}
3030
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE
3131
import org.apache.spark.sql.errors.QueryCompilationErrors
32-
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
32+
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Table}
3333
import org.apache.spark.sql.types.IntegerType
3434
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3535

@@ -125,7 +125,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper
125125
if m.resolved && m.rewritable && m.aligned && !m.needSchemaEvolution =>
126126

127127
EliminateSubqueryAliases(aliasedTable) match {
128-
case r @ DataSourceV2Relation(tbl: SupportsRowLevelOperations, _, _, _, _) =>
128+
case r @ DataSourceV2Table(tbl: SupportsRowLevelOperations) =>
129129
validateMergeIntoConditions(m)
130130
val table = buildOperationTable(tbl, MERGE, CaseInsensitiveStringMap.empty())
131131
table.operation match {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.util.RowDeltaUtils._
2424
import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
2525
import org.apache.spark.sql.connector.write.{RowLevelOperationTable, SupportsDelta}
2626
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE
27-
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
27+
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Table}
2828
import org.apache.spark.sql.types.IntegerType
2929
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3030

@@ -40,7 +40,7 @@ object RewriteUpdateTable extends RewriteRowLevelCommand {
4040
if u.resolved && u.rewritable && u.aligned =>
4141

4242
EliminateSubqueryAliases(aliasedTable) match {
43-
case r @ DataSourceV2Relation(tbl: SupportsRowLevelOperations, _, _, _, _) =>
43+
case r @ DataSourceV2Table(tbl: SupportsRowLevelOperations) =>
4444
val table = buildOperationTable(tbl, UPDATE, CaseInsensitiveStringMap.empty())
4545
val updateCond = cond.getOrElse(TrueLiteral)
4646
table.operation match {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
2727
import org.apache.spark.sql.connector.catalog.Table
2828
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE
2929
import org.apache.spark.sql.errors.QueryCompilationErrors
30-
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation}
30+
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation, DataSourceV2Table}
3131
import org.apache.spark.sql.internal.SQLConf
3232

3333
trait OperationHelper extends AliasHelper with PredicateHelper {
@@ -436,8 +436,7 @@ object GroupBasedRowLevelOperation {
436436
type ReturnType = (ReplaceData, Expression, Option[Expression], LogicalPlan)
437437

438438
def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
439-
case rd @ ReplaceData(DataSourceV2Relation(table, _, _, _, _),
440-
cond, query, _, _, groupFilterCond, _) =>
439+
case rd @ ReplaceData(DataSourceV2Table(table), cond, query, _, _, groupFilterCond, _) =>
441440
// group-based UPDATEs that are rewritten as UNION read the table twice
442441
val allowMultipleReads = rd.operation.command == UPDATE
443442
val readRelation = findReadRelation(table, query, allowMultipleReads)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.apache.spark.sql.connector.write.{DeltaWrite, RowLevelOperation, RowL
3939
import org.apache.spark.sql.connector.write.RowLevelOperation.Command.{DELETE, MERGE, UPDATE}
4040
import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
4141
import org.apache.spark.sql.errors.QueryExecutionErrors
42-
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
42+
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Table}
4343
import org.apache.spark.sql.internal.SQLConf
4444
import org.apache.spark.sql.types.{ArrayType, AtomicType, BooleanType, DataType, IntegerType, MapType, MetadataBuilder, StringType, StructField, StructType}
4545
import org.apache.spark.util.ArrayImplicits._
@@ -263,7 +263,7 @@ case class ReplaceData(
263263

264264
lazy val operation: RowLevelOperation = {
265265
EliminateSubqueryAliases(table) match {
266-
case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, _) =>
266+
case DataSourceV2Table(RowLevelOperationTable(_, operation)) =>
267267
operation
268268
case _ =>
269269
throw new AnalysisException(
@@ -345,7 +345,7 @@ case class WriteDelta(
345345

346346
lazy val operation: SupportsDelta = {
347347
EliminateSubqueryAliases(table) match {
348-
case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, _) =>
348+
case DataSourceV2Table(RowLevelOperationTable(_, operation)) =>
349349
operation.asInstanceOf[SupportsDelta]
350350
case _ =>
351351
throw new AnalysisException(
@@ -834,7 +834,7 @@ case class UpdateTable(
834834

835835
lazy val rewritable: Boolean = {
836836
EliminateSubqueryAliases(table) match {
837-
case DataSourceV2Relation(_: SupportsRowLevelOperations, _, _, _, _) => true
837+
case DataSourceV2Table(_: SupportsRowLevelOperations) => true
838838
case _ => false
839839
}
840840
}
@@ -878,7 +878,7 @@ case class MergeIntoTable(
878878

879879
lazy val rewritable: Boolean = {
880880
EliminateSubqueryAliases(targetTable) match {
881-
case DataSourceV2Relation(_: SupportsRowLevelOperations, _, _, _, _) => true
881+
case DataSourceV2Table(_: SupportsRowLevelOperations) => true
882882
case _ => false
883883
}
884884
}

sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,10 @@ case class StreamingDataSourceV2ScanRelation(
231231
override protected def stringArgs: Iterator[Any] = stringArgsVal.iterator
232232
}
233233

234+
object DataSourceV2Table {
235+
def unapply(relation: DataSourceV2Relation): Option[Table] = Some(relation.table)
236+
}
237+
234238
object DataSourceV2Relation {
235239
def create(
236240
table: Table,

sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
6060
import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, ArrowConverters}
6161
import org.apache.spark.sql.execution.command._
6262
import org.apache.spark.sql.execution.datasources.LogicalRelationWithTable
63-
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanRelation, FileTable}
63+
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, DataSourceV2Table, FileTable}
6464
import org.apache.spark.sql.execution.python.EvaluatePython
6565
import org.apache.spark.sql.execution.stat.StatFunctions
6666
import org.apache.spark.sql.internal.SQLConf
@@ -1733,8 +1733,7 @@ class Dataset[T] private[sql](
17331733
fr.inputFiles
17341734
case r: HiveTableRelation =>
17351735
r.tableMeta.storage.locationUri.map(_.toString).toArray
1736-
case DataSourceV2ScanRelation(DataSourceV2Relation(table: FileTable, _, _, _, _),
1737-
_, _, _, _) =>
1736+
case DataSourceV2ScanRelation(DataSourceV2Table(table: FileTable), _, _, _, _) =>
17381737
table.fileIndex.inputFiles
17391738
}.flatten
17401739
files.toSet.toArray

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
3232
import org.apache.spark.sql.execution.columnar.InMemoryRelation
3333
import org.apache.spark.sql.execution.command.CommandUtils
3434
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation, LogicalRelationWithTable}
35-
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable}
35+
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Table, FileTable}
3636
import org.apache.spark.sql.internal.SQLConf
3737
import org.apache.spark.storage.StorageLevel
3838
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
@@ -431,7 +431,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
431431
case _ => false
432432
}
433433

434-
case DataSourceV2Relation(fileTable: FileTable, _, _, _, _) =>
434+
case DataSourceV2Table(fileTable: FileTable) =>
435435
refreshFileIndexIfNecessary(fileTable.fileIndex, fs, qualifiedPath)
436436

437437
case _ => false

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.jdk.CollectionConverters._
2222
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, LogicalPlan}
2323
import org.apache.spark.sql.catalyst.rules.Rule
2424
import org.apache.spark.sql.classic.SparkSession
25-
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, FileTable}
25+
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Table, FileTable}
2626

2727
/**
2828
* Replace the File source V2 table in [[InsertIntoStatement]] to V1 [[FileFormat]].
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, File
3535
class FallBackFileSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] {
3636
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
3737
case i @ InsertIntoStatement(
38-
d @ DataSourceV2Relation(table: FileTable, _, _, _, _), _, _, _, _, _, _) =>
38+
d @ DataSourceV2Table(table: FileTable), _, _, _, _, _, _) =>
3939
val v1FileFormat = table.fallbackFileFormat.getDeclaredConstructor().newInstance()
4040
val relation = HadoopFsRelation(
4141
table.fileIndex,

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
264264
invalidateCache) :: Nil
265265
}
266266

267-
case AppendData(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _), _, _,
267+
case AppendData(r @ DataSourceV2Table(v1: SupportsWrite), _, _,
268268
_, Some(write), analyzedQuery) if v1.supports(TableCapability.V1_BATCH_WRITE) =>
269269
write match {
270270
case v1Write: V1Write =>
@@ -278,7 +278,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
278278
case AppendData(r: DataSourceV2Relation, query, _, _, Some(write), _) =>
279279
AppendDataExec(planLater(query), refreshCache(r), write) :: Nil
280280

281-
case OverwriteByExpression(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _), _, _,
281+
case OverwriteByExpression(r @ DataSourceV2Table(v1: SupportsWrite), _, _,
282282
_, _, Some(write), analyzedQuery) if v1.supports(TableCapability.V1_BATCH_WRITE) =>
283283
write match {
284284
case v1Write: V1Write =>

0 commit comments

Comments
 (0)