-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-52060][SQL] Make OneRowRelationExec
node
#50849
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
OneRowRelationExec
nodeOneRowRelationExec
node
sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
Outdated
Show resolved
Hide resolved
} | ||
|
||
override def simpleString(maxFields: Int): String = { | ||
s"$nodeName${truncatedString(output, "[", ",", "]", maxFields)}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this different from the default implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the default implementation returns Scan OneRowRelation
, while the existing implementation (using RDDScan) returns Scan OneRowRelation[]
. I figured we shouldn't change this in the off chance that someone is relying on it.
sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
Outdated
Show resolved
Hide resolved
there are failures like
which i also see in other PRs such as here, so i think the failures are unrelated |
private val rdd = session.sparkContext.parallelize(Seq(emptyRow), 1) | ||
|
||
override lazy val metrics = Map( | ||
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's always one row, do we need this metric?
|
||
private val emptyRow: InternalRow = InternalRow.empty | ||
|
||
private val rdd = session.sparkContext.parallelize(Seq(emptyRow), 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can do this
private val rdd = {
val proj = UnsafeProjection.create(schema)
val emptyRow = proj(InternalRow.empty)
session.sparkContext.parallelize(Seq(emptyRow), 1)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then def doExecute()
can just return this RDD
|
||
override def inputRDD: RDD[InternalRow] = rdd | ||
|
||
override protected val createUnsafeProjection: Boolean = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do https://github.com/apache/spark/pull/50849/files#r2097521541 , then this can be false.
What changes were proposed in this pull request?
creates a new OneRowRelationExec node, which is more or less a copy of the RDDScanExec node.
We want a dedicated node because this helps make it more clear when a one row relation, i.e. for patterns like
SELECT version()
is used.Why are the changes needed?
this makes it more clear in the code that a one row relation is used and allows us to avoid checking the hard coded "OneRowRelation" string when pattern matching.
Does this PR introduce any user-facing change?
yes, the plan will now be
OneRowRelationExec
rather thanRDDScanExec
. The plan string should be the same, however.How was this patch tested?
added UTs
Was this patch authored or co-authored using generative AI tooling?