Skip to content

[SPARK-23203][SQL] make DataSourceV2Relation immutable #20448

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,9 @@

package org.apache.spark.sql.kafka010

import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger

import org.scalatest.time.SpanSugar._
import scala.collection.mutable
import scala.util.Random

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.streaming.{StreamTest, Trigger}
import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.streaming.Trigger

// Run tests in KafkaSourceSuiteBase in continuous execution mode.
class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuousTest
Expand Down Expand Up @@ -71,7 +60,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
eventually(timeout(streamingTimeout)) {
assert(
query.lastExecution.logical.collectFirst {
case DataSourceV2Relation(_, r: KafkaContinuousReader) => r
case StreamingDataSourceV2Relation(_, r: KafkaContinuousReader) => r
}.exists { r =>
// Ensure the new topic is present and the old topic is gone.
r.knownPartitions.exists(_.topic == topic2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, SparkListenerTaskStart}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.streaming.Trigger
Expand All @@ -47,7 +47,7 @@ trait KafkaContinuousTest extends KafkaSourceTest {
eventually(timeout(streamingTimeout)) {
assert(
query.lastExecution.logical.collectFirst {
case DataSourceV2Relation(_, r: KafkaContinuousReader) => r
case StreamingDataSourceV2Relation(_, r: KafkaContinuousReader) => r
}.exists(_.knownPartitions.size == newCount),
s"query never reconfigured to $newCount partitions")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkContext
import org.apache.spark.sql.{Dataset, ForeachWriter}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.functions.{count, window}
Expand Down Expand Up @@ -117,7 +117,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
} ++ (query.get.lastExecution match {
case null => Seq()
case e => e.logical.collect {
case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
case StreamingDataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
}
})
if (sources.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,4 @@ public interface SupportsPushDownCatalystFilters extends DataSourceReader {
* Pushes down filters, and returns unsupported filters.
*/
Expression[] pushCatalystFilters(Expression[] filters);

/**
* Returns the catalyst filters that are pushed in {@link #pushCatalystFilters(Expression[])}.
* It's possible that there is no filters in the query and
* {@link #pushCatalystFilters(Expression[])} is never called, empty array should be returned for
* this case.
*/
Expression[] pushedCatalystFilters();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,4 @@ public interface SupportsPushDownFilters extends DataSourceReader {
* Pushes down filters, and returns unsupported filters.
*/
Filter[] pushFilters(Filter[] filters);

/**
* Returns the filters that are pushed in {@link #pushFilters(Filter[])}.
* It's possible that there is no filters in the query and {@link #pushFilters(Filter[])}
* is never called, empty array should be returned for this case.
*/
Filter[] pushedFilters();
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {

val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
val ds = cls.newInstance()
val ds = cls.newInstance().asInstanceOf[DataSourceV2]
val options = new DataSourceOptions((extraOptions ++
DataSourceV2Utils.extractSessionConfigs(
ds = ds.asInstanceOf[DataSourceV2],
Expand Down Expand Up @@ -217,7 +217,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
if (reader == null) {
loadV1Source(paths: _*)
} else {
Dataset.ofRows(sparkSession, DataSourceV2Relation(reader))
Dataset.ofRows(sparkSession,
DataSourceV2Relation(ds, reader.readSchema(), options, userSpecifiedSchema))
}
} else {
loadV1Source(paths: _*)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import java.util.Objects

import org.apache.commons.lang3.StringUtils

import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.DataSourceV2
import org.apache.spark.util.Utils

/**
* A base class for data source v2 related query plan(both logical and physical). It defines the
* equals/hashCode methods according to some common information.
*/
trait DataSourceV2QueryPlan {

def output: Seq[Attribute]
def sourceClass: Class[_ <: DataSourceV2]
def filters: Set[Expression]

// The metadata of this data source relation that can be used for equality test.
private def metadata: Seq[Any] = Seq(output, sourceClass, filters)

def canEqual(other: Any): Boolean

override def equals(other: Any): Boolean = other match {
case other: DataSourceV2QueryPlan =>
canEqual(other) && metadata == other.metadata
case _ => false
}

override def hashCode(): Int = {
metadata.map(Objects.hashCode).foldLeft(0)((a, b) => 31 * a + b)
}

def metadataString: String = {
val entries = scala.collection.mutable.ArrayBuffer.empty[(String, String)]
if (filters.nonEmpty) entries += "PushedFilter" -> filters.mkString("[", ", ", "]")

val outputStr = Utils.truncatedString(output, "[", ", ", "]")
val entriesStr = Utils.truncatedString(entries.map {
case (key, value) => key + ": " + StringUtils.abbreviate(redact(value), 100)
}, " (", ", ", ")")

s"${sourceClass.getSimpleName}$outputStr$entriesStr"
}

private def redact(text: String): String = {
Utils.redact(SQLConf.get.stringRedationPattern, text)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,84 @@

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, ReadSupportWithSchema}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.types.StructType

/**
* A logical plan representing a data source relation, which will be planned to a data scan
* operator finally.
*
* @param output The output of this relation.
* @param source The instance of a data source v2 implementation.
* @param options The options specified for this scan, used to create the `DataSourceReader`.
* @param userSpecifiedSchema The user specified schema, used to create the `DataSourceReader`.
* @param filters The predicates which are pushed and handled by this data source.
* @param existingReader A mutable reader carrying some temporary stats during optimization and
* planning. It's always None before optimization, and does not take part in
* the equality of this plan, which means this plan is still immutable.
*/
case class DataSourceV2Relation(
fullOutput: Seq[AttributeReference],
reader: DataSourceReader) extends LeafNode with DataSourceReaderHolder {
output: Seq[AttributeReference],
source: DataSourceV2,
options: DataSourceOptions,
userSpecifiedSchema: Option[StructType],
filters: Set[Expression],
existingReader: Option[DataSourceReader]) extends LeafNode with DataSourceV2QueryPlan {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this plan does not extend MultiInstanceRelation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test for self join? Just to ensure it still works.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! Yea this is a bug, but to respect the rule about solving different issues in different PR, I'd like to fix it in a new PR.


override def references: AttributeSet = AttributeSet.empty

override def sourceClass: Class[_ <: DataSourceV2] = source.getClass

override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation]

def reader: DataSourceReader = existingReader.getOrElse {
(source, userSpecifiedSchema) match {
case (ds: ReadSupportWithSchema, Some(schema)) =>
ds.createReader(schema, options)

case (ds: ReadSupport, None) =>
ds.createReader(options)

case (ds: ReadSupport, Some(schema)) =>
val reader = ds.createReader(options)
// Sanity check, this should be guaranteed by `DataFrameReader.load`
assert(reader.readSchema() == schema)
reader

case _ => throw new IllegalStateException()
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to override a def doCanonicalize?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the output of this node in Explain?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the behavior we expect when users call REFRESH TABLE?

Also another potential issue is about storing the statistics in the external catalog? Do we still have the previous issues discussed in #14712?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data source v2 doesn't support tables yet, so we don't have this problem now.

override def computeStats(): Statistics = reader match {
case r: SupportsReportStatistics =>
Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
case _ =>
Statistics(sizeInBytes = conf.defaultSizeInBytes)
}

override def simpleString: String = s"Relation $metadataString"
}

/**
* A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical
* to the non-streaming relation.
*/
class StreamingDataSourceV2Relation(
fullOutput: Seq[AttributeReference],
reader: DataSourceReader) extends DataSourceV2Relation(fullOutput, reader) {
case class StreamingDataSourceV2Relation(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to LogicalRelation, I think we can simply add a isStream parameter to DataSourceV2Relation. This can be addressed in a follow up PR.

output: Seq[AttributeReference],
reader: DataSourceReader) extends LeafNode {
override def isStreaming: Boolean = true
}

object DataSourceV2Relation {
def apply(reader: DataSourceReader): DataSourceV2Relation = {
new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
def apply(
source: DataSourceV2,
schema: StructType,
options: DataSourceOptions,
userSpecifiedSchema: Option[StructType]): DataSourceV2Relation = {
DataSourceV2Relation(
schema.toAttributes, source, options, userSpecifiedSchema, Set.empty, None)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.v2

import scala.collection.JavaConverters._
import scala.language.existentials

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
Expand All @@ -27,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.sources.v2.DataSourceV2
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader
import org.apache.spark.sql.types.StructType
Expand All @@ -35,13 +37,17 @@ import org.apache.spark.sql.types.StructType
* Physical plan node for scanning data from a data source.
*/
case class DataSourceV2ScanExec(
fullOutput: Seq[AttributeReference],
@transient reader: DataSourceReader)
extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan {
output: Seq[AttributeReference],
@transient reader: DataSourceReader,
@transient sourceClass: Class[_ <: DataSourceV2],
@transient filters: Set[Expression])
extends LeafExecNode with DataSourceV2QueryPlan with ColumnarBatchScan {

override def references: AttributeSet = AttributeSet.empty

override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2ScanExec]

override def producedAttributes: AttributeSet = AttributeSet(fullOutput)
override def simpleString: String = s"Scan $metadataString"

override def outputPartitioning: physical.Partitioning = reader match {
case s: SupportsReportPartitioning =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ import org.apache.spark.sql.execution.SparkPlan

object DataSourceV2Strategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case DataSourceV2Relation(output, reader) =>
DataSourceV2ScanExec(output, reader) :: Nil
case relation: DataSourceV2Relation =>
DataSourceV2ScanExec(
relation.output,
relation.reader,
relation.sourceClass,
relation.filters) :: Nil

case WriteToDataSourceV2(writer, query) =>
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
Expand Down
Loading