Skip to content

[SPARK-13432][SQL] add the source file name and line into a generated Java code #11301

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
3f99740
add the source file name and line into a generated Java code
kiszk Feb 22, 2016
ec0c1a9
commit Expression.scala
kiszk Feb 22, 2016
899bc64
add a message to show the origin of a generated method as possible wh…
kiszk Feb 25, 2016
bf72c13
make ExpressionSet serializable
kiszk Feb 27, 2016
27b13d2
revert changes to create another PR
kiszk Mar 3, 2016
09245eb
revert changes to create another PRsql/core/src/main/scala/org/apache…
kiszk Mar 3, 2016
9731e4e
revert
kiszk Mar 3, 2016
305852a
Add toOriginString()
kiszk Mar 3, 2016
10de448
call toOriginString() in toCommentSafeString()
kiszk Mar 3, 2016
e68f551
call toOriginString() in toCommentSafeString()
kiszk Mar 3, 2016
458db22
add CurrentOrigin when a Column object is created
kiszk Mar 10, 2016
dfbe2df
rebase
kiszk Mar 10, 2016
7b606ab
replace Origin.callSite in Expression in Column with that for DataFra…
kiszk Mar 16, 2016
d9536d4
add callSite in toOriginString() for all subclassses of a TreeNode class
kiszk Mar 18, 2016
9d70b36
rebase
kiszk Apr 10, 2016
05332ed
fix test failure of 'semantic errors' in ErrorParserSuite
kiszk Apr 11, 2016
330521c
Fix issues by avoiding to set Origin in a constructor of Column
kiszk Apr 12, 2016
607e678
addressed a comment by make TreeNode serializable
kiszk Apr 12, 2016
f0b9587
Revert a change. Now, TreeNode is non-serializable
kiszk Apr 12, 2016
0bf3586
Update CurrentOrigin to set origin to LogicalPlan
kiszk Apr 12, 2016
c08d839
stop setting callSite into Origin at parse time
kiszk Apr 12, 2016
7c68e07
add test suites
kiszk Apr 12, 2016
0091eb5
fix compilation error
kiszk Apr 13, 2016
bacfcc6
resolved conflicts
kiszk Apr 21, 2016
4f8772b
fix build error
kiszk Apr 27, 2016
ccdf12d
resolved conflicts
kiszk May 6, 2016
7279489
addressed minor comments
kiszk May 8, 2016
a96bc48
update expected line numbers in test suite
kiszk May 8, 2016
597b732
update testsuite
kiszk Jun 2, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1317,8 +1317,10 @@ private[spark] object Utils extends Logging {
val isSparkClass = SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined ||
SPARK_SQL_CLASS_REGEX.findFirstIn(className).isDefined
val isScalaClass = className.startsWith(SCALA_CORE_CLASS_PREFIX)
val testClassName = System.getProperty("spark.callstack.testClass")
val isSparkTestSuiteClass = (testClassName != null) && className.startsWith(testClassName)
// If the class is a Spark internal class or a Scala class, then exclude.
isSparkClass || isScalaClass
(isSparkClass || isScalaClass) && !isSparkTestSuiteClass
}

/**
Expand All @@ -1328,7 +1330,8 @@ private[spark] object Utils extends Logging {
*
* @param skipClass Function that is used to exclude non-user-code classes.
*/
def getCallSite(skipClass: String => Boolean = sparkInternalExclusionFunction): CallSite = {
def getCallSite(skipClass: String => Boolean = sparkInternalExclusionFunction):
CallSite = {
// Keep crawling up the stack trace until we find the first function not inside of the spark
// package. We track the last (shallowest) contiguous Spark method. This might be an RDD
// transformation, a SparkContext function (such as parallelize), or anything else that leads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ abstract class Expression extends TreeNode[Expression] {
ctx.subExprEliminationExprs.get(this).map { subExprState =>
// This expression is repeated which means that the code to evaluate it has already been added
// as a function before. In that case, we just re-use it.
ExprCode(ctx.registerComment(this.toString), subExprState.isNull, subExprState.value)
ExprCode(ctx.registerComment(this.toOriginString), subExprState.isNull, subExprState.value)
}.getOrElse {
val isNull = ctx.freshName("isNull")
val value = ctx.freshName("value")
val ve = doGenCode(ctx, ExprCode("", isNull, value))
if (ve.code.nonEmpty) {
// Add `this` in the comment.
ve.copy(code = s"${ctx.registerComment(this.toString)}\n" + ve.code.trim)
ve.copy(code = s"${ctx.registerComment(this.toOriginString)}\n" + ve.code.trim)
} else {
ve
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object ExpressionSet {
class ExpressionSet protected(
protected val baseSet: mutable.Set[Expression] = new mutable.HashSet,
protected val originals: mutable.Buffer[Expression] = new ArrayBuffer)
extends Set[Expression] {
extends Set[Expression] with Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, constraints in QueryPlan is ExpressionSet and SparkPlan which is a subclass of QueryPlan is serializable so ExpressionSet should be also serializable strictly. But constraints is lazy val and it's not accessed when the receiver object is a instance of SparkPlan. In other word, constraints is accessed only when the receiver object is a instance of LogicalPlan.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your explanation on how constraints is accessed. From the implementation view, my understanding is that it is still necessary to declare Serializable for ExpressionSet . Is there another good idea to enable Serializable only for LogicalPlan?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If ExpressionSet is really serialized only in the case of LogicalPlan, we could move constraints from QueryPlan to LogicalPlan but I'm not sure it's correct way.
Have you ever got any problem because ExpressionSet is not Serializable ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I got an exception regarding non-serializable in test suites in hive when ExpressionSet is not Serializable. This is why I added Serialiable to ExpressionSet

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

O.K. I got it.


protected def add(e: Expression): Unit = {
if (!baseSet.contains(e.canonicalized)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ trait CodegenFallback extends Expression {
val idx = ctx.references.length
ctx.references += this
val objectTerm = ctx.freshName("obj")
val placeHolder = ctx.registerComment(this.toString)
val placeHolder = ctx.registerComment(this.toOriginString)
if (nullable) {
ev.copy(code = s"""
$placeHolder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ abstract class AbstractSqlParser extends ParserInterface with Logging {
case e: ParseException =>
Copy link
Member

@sarutak sarutak May 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just in case, please keep the mode of this file and other files 644 instead of 755

throw e.withCommand(command)
case e: AnalysisException =>
val position = Origin(e.line, e.startPosition)
val position = Origin(None, e.line, e.startPosition)
throw new ParseException(Option(command), e.message, position, position)
}
}
Expand Down Expand Up @@ -150,7 +150,7 @@ case object ParseErrorListener extends BaseErrorListener {
charPositionInLine: Int,
msg: String,
e: RecognitionException): Unit = {
val position = Origin(Some(line), Some(charPositionInLine))
val position = Origin(None, Some(line), Some(charPositionInLine))
throw new ParseException(None, msg, position, position)
}
}
Expand All @@ -176,7 +176,7 @@ class ParseException(
val builder = new StringBuilder
builder ++= "\n" ++= message
start match {
case Origin(Some(l), Some(p)) =>
case Origin(_, Some(l), Some(p)) =>
builder ++= s"(line $l, pos $p)\n"
command.foreach { cmd =>
val (above, below) = cmd.split("\n").splitAt(l)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ object ParserUtils {

/** Get the origin (line and position) of the token. */
def position(token: Token): Origin = {
Origin(Option(token.getLine), Option(token.getCharPositionInLine))
Origin(None, Option(token.getLine), Option(token.getCharPositionInLine))
}

/** Assert if a condition holds. If it doesn't throw a parse exception. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.util.Utils
private class MutableInt(var i: Int)

case class Origin(
var callSite: Option[String] = None,
line: Option[Int] = None,
startPosition: Option[Int] = None)

Expand All @@ -58,15 +59,15 @@ object CurrentOrigin {

def reset(): Unit = value.set(Origin())

def setPosition(line: Int, start: Int): Unit = {
def setPosition(callSite: String, line: Int, start: Int): Unit = {
value.set(
value.get.copy(line = Some(line), startPosition = Some(start)))
value.get.copy(callSite = Some(callSite), line = Some(line), startPosition = Some(start)))
}

def withOrigin[A](o: Origin)(f: => A): A = {
val current = get
set(o)
val ret = try f finally { reset() }
reset()
val ret = try f finally { set(current) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might correct change but I noticed that after this change, we have another issue when we operate DataFrame using both DSL like API and SQL/HiveQL.

For example, If we have follwing code and run it.

    val df = sc.parallelize(1 to 10).toDF
    val filtered = df.filter("_1 > 4")
    val selected = filtered.select($"_1" * 10)
    selected.show()

And then, we have generated code like as follows.

...

/* 055 */       while (rdd_batchIdx < numRows) {
/* 056 */         InternalRow rdd_row = rdd_batch.getRow(rdd_batchIdx++);
/* 057 */         /* input[0, int] */
/* 058 */         boolean rdd_isNull = rdd_row.isNullAt(0);
/* 059 */         int rdd_value = rdd_isNull ? -1 : (rdd_row.getInt(0));
/* 060 */         /* (input[0, int] > 4) @ filter at SPARK13432.scala:14 */
/* 061 */         boolean filter_isNull = true;
/* 062 */         boolean filter_value = false;
/* 063 */         
/* 064 */         if (!rdd_isNull) {
/* 065 */           filter_isNull = false; // resultCode could change nullability.
/* 066 */           filter_value = rdd_value > 4;
/* 067 */           
/* 068 */         }
/* 069 */         if (!filter_isNull && filter_value) {
/* 070 */           filter_metricValue.add(1);
/* 071 */           
/* 072 */           /* (input[0, int] * 10) @ filter at SPARK13432.scala:14 */
/* 073 */           boolean project_isNull = true;
/* 074 */           int project_value = -1;

...

At the line #72, it should not be filter and the line of original code is not 14.
I think, the comment should just say /* (input[0, int] * 10 */.

This issue is because origin is not reset properly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also supported Column that is used in DSL. The latest code generates a comment /* (input[0, int] * 10) @ $times at SPARK13432.scala:15 */
Is this fine with you?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the change. Ideally, the real call site should be displayed. In the case above, the comment should be displayed like /* (input[0, int] * 10) @ select at SPARK13432.scala:15 */.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ideally agree with you. As you can imagine, this is why an argument $"_1" * 10 has been evaluated before the real call site select is called. Thus, $times for * is recorded instead of select.
Do we try to replace content of origin in Column with the call site for a real call site (e.g. select) when this is called?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be replaced with the real call site but if it's difficult, how about just reset the origin and memorize TODO for the future work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change can generate a comment for Column with the real call site.

/* 120 */     while (!shouldStop() && (firstRow || rdd_input.hasNext())) {
/* 121 */       if (firstRow) {
/* 122 */         firstRow = false;
/* 123 */       } else {
/* 124 */         rdd_row = (InternalRow) rdd_input.next();
/* 125 */       }
/* 126 */       rdd_metricValue.add(1);
/* 127 */       /*** CONSUME: Filter (isnotnull(_1#0) && (_1#0 > 4)) */
/* 128 */       /* input[0, int] [org.apache.spark.sql.catalyst.expressions.BoundReference] @ filter at DF.scala:16 */
/* 129 */       boolean project_isNull4 = rdd_row.isNullAt(0);
/* 130 */       int project_value4 = project_isNull4 ? -1 : (rdd_row.getInt(0));
/* 131 */       
/* 132 */       /* (isnotnull(input[0, int]) && (input[0, int] > 4)) [org.apache.spark.sql.catalyst.expressions.And] @ select at DF.scala:17 */
/* 133 */       boolean filter_isNull6 = false;
/* 134 */       boolean filter_value6 = false;
/* 135 */       
/* 136 */       if (!false && !(!(project_isNull4))) {
/* 137 */       } else {
/* 138 */         /* (input[0, int] > 4) [org.apache.spark.sql.catalyst.expressions.GreaterThan] @ filter at DF.scala:16 */
/* 139 */         boolean filter_isNull9 = true;
/* 140 */         boolean filter_value9 = false;
/* 141 */         
/* 142 */         if (!project_isNull4) {
/* 143 */           filter_isNull9 = false; // resultCode could change nullability.
/* 144 */           filter_value9 = project_value4 > 4;
/* 145 */           
/* 146 */         }
/* 147 */         if (!filter_isNull9 && !filter_value9) {
/* 148 */         } else if (!false && !filter_isNull9) {
/* 149 */           filter_value6 = true;
/* 150 */         } else {
/* 151 */           filter_isNull6 = true;
/* 152 */         }
/* 153 */       }
/* 154 */       if (!(!filter_isNull6 && filter_value6)) continue;
/* 155 */       filter_metricValue1.add(1);
/* 156 */       
/* 157 */       /*** CONSUME: Project [(_1#0 * 10) AS (_1 * 10)#1] */
/* 158 */       
/* 159 */       /* (input[0, int] * 10) [org.apache.spark.sql.catalyst.expressions.Multiply] @ select at DF.scala:17 */
/* 160 */       boolean project_isNull5 = true;
/* 161 */       int project_value5 = -1;

ret
}
}
Expand Down Expand Up @@ -442,6 +443,13 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {

override def toString: String = treeString

def toOriginString: String =
if (this.origin.callSite.isDefined) {
this.toString + " @ " + this.origin.callSite.get
} else {
this.toString
}

/** Returns a string representation of the nodes in this tree */
def treeString: String = generateTreeString(0, Nil, new StringBuilder).toString

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,16 @@ package object util {

def toPrettySQL(e: Expression): String = usePrettyExpression(e).sql

/**
* Returns the string representation of this expression that is safe to be put in
* code comments of generated code. The length is capped at 128 characters.
*/
def toCommentSafeString(str: String): String = {
val len = math.min(str.length, 128)
val suffix = if (str.length > len) "..." else ""
str.substring(0, len).replace("*/", "\\*\\/").replace("\\u", "\\\\u") + suffix
}

/* FIX ME
implicit class debugLogging(a: Any) {
def debugLogging() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst.trees

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.SparkContext
import org.apache.spark.SparkFunSuite
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
Expand Down Expand Up @@ -117,18 +119,39 @@ class TreeNodeSuite extends SparkFunSuite {
}

test("preserves origin") {
CurrentOrigin.setPosition(1, 1)
CurrentOrigin.setPosition("TreeNodeSuite.scala:120", 1, 1)
val add = Add(Literal(1), Literal(1))
CurrentOrigin.reset()

val transformed = add transform {
case Literal(1, _) => Literal(2)
}

assert(transformed.origin.callSite.isDefined)
assert(transformed.origin.line.isDefined)
assert(transformed.origin.startPosition.isDefined)
}

test("preserves origin thru SerDe") {
val sc = new SparkContext("local", "test")
val callSite = "TreeNodeSuite.scala:137"
val line = 1
val startPosition = 2
CurrentOrigin.setPosition(callSite, line, startPosition)
val add = Add(Literal(1), Literal(2))

val ser = sc.env.closureSerializer.newInstance()
val serBinary = ser.serialize(add)
val deadd = ser.deserialize[Expression](serBinary, Thread.currentThread.getContextClassLoader)

assert(deadd.origin.callSite.isDefined &&
deadd.origin.callSite.get == callSite)
assert(deadd.origin.line.isDefined &&
deadd.origin.line.get == line)
assert(deadd.origin.startPosition.isDefined &&
deadd.origin.startPosition.get == startPosition)
}

test("foreach up") {
val actual = new ArrayBuffer[String]()
val expected = Seq("1", "2", "3", "4", "-", "*", "+")
Expand Down
12 changes: 12 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/Column.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, Origin}
import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils.getCallSite

private[sql] object Column {

Expand All @@ -46,6 +48,16 @@ private[sql] object Column {
case expr => usePrettyExpression(expr).sql
}
}

@scala.annotation.varargs
def updateExpressionsOrigin(cols: Column*): Unit = {
// Update Expression.origin using the callSite of an operation
val callSite = org.apache.spark.util.Utils.getCallSite().shortForm
cols.map(col => col.expr.foreach(e => e.origin.callSite = Some(callSite)))
// Update CurrentOrigin for setting origin for LogicalPlan node
CurrentOrigin.set(
Origin(Some(callSite), CurrentOrigin.get.line, CurrentOrigin.get.startPosition))
Copy link
Member

@sarutak sarutak May 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better not to set CurrentOrigin directly because origin state changes globally under the same thread.
Otherwise, we have some problems.

  1. call sites associated with different queries can be scrambled.

For example, let's say we execute following 2 queries. One is filter and other is orderBy.

val df = sc.parallelize(1 to 10, 1).toDF
df.filter($"value" + 10 > 4).show        // query1 filter
df.orderBy($"value" + 13).show        // query2 orderBy

One of the code generated for query2 is as follows.
query2 don't have filter operation but the call site includes @ filter, it's related to previous query (query1).

/* 006 */ class SpecificOrdering extends org.apache.spark.sql.catalyst.expressions.codegen.BaseOrdering {
/* 007 */   

...

/* 024 */       /* (input[0, int] + 13) @ orderBy at <console>:27 */
/* 025 */       /* input[0, int] @ filter at <console>:27 */
/* 026 */       int value1 = i.getInt(0);
  1. call site should not be associated with BaseProjection. It's not related to each expression directly.

When we write queries without column objects like df.filter("value + 10 > 3"), call sites are not associated withBaseProjection`.

/* 006 */ class SpecificSafeProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
/* 007 */   
/* 008 */   private Object[] references;
/* 009 */   private MutableRow mutableRow;
/* 010 */   private Object[] values;
/* 011 */   private org.apache.spark.sql.types.StructType schema;
/* 012 */   
/* 013 */   
/* 014 */   public SpecificSafeProjection(Object[] references) {
/* 015 */     this.references = references;
/* 016 */     mutableRow = (MutableRow) references[references.length - 1];
/* 017 */     
/* 018 */     this.schema = (org.apache.spark.sql.types.StructType) references[0];
/* 019 */   }
/* 020 */   
/* 021 */   public java.lang.Object apply(java.lang.Object _i) {
/* 022 */     InternalRow i = (InternalRow) _i;
/* 023 */     /* createexternalrow(if (isnull(input[0, int])) null else input[0, int], StructField(value,IntegerType,false)) */
/* 024 */     values = new Object[1];
/* 025 */     /* if (isnull(input[0, int])) null else input[0, int] */
/* 026 */     /* isnull(input[0, int]) */
/* 027 */     /* input[0, int] */
/* 028 */     int value3 = i.getInt(0);
/* 029 */     boolean isNull1 = false;
/* 030 */     int value1 = -1;
/* 031 */     if (!false && false) {
/* 032 */       /* null */
/* 033 */       final int value4 = -1;
/* 034 */       isNull1 = true;
/* 035 */       value1 = value4;
/* 036 */     } else {
/* 037 */       /* input[0, int] */
/* 038 */       int value5 = i.getInt(0);
/* 039 */       isNull1 = false;
/* 040 */       value1 = value5;
/* 041 */     }
/* 042 */     if (isNull1) {
/* 043 */       values[0] = null;
/* 044 */     } else {
/* 045 */       values[0] = value1;
/* 046 */     }
/* 047 */     
/* 048 */     final org.apache.spark.sql.Row value = new org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema(values, this.schema);
/* 049 */     if (false) {
/* 050 */       mutableRow.setNullAt(0);
/* 051 */     } else {
/* 052 */       
/* 053 */       mutableRow.update(0, value);
/* 054 */     }
/* 055 */     
/* 056 */     return mutableRow;
/* 057 */   }
/* 058 */ }
/* 059 */ 

But if we write queries with column objects, call sites are associated with BaseProjection.

/* 006 */ class SpecificSafeProjection extends org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
/* 007 */   
/* 008 */   private Object[] references;
/* 009 */   private MutableRow mutableRow;
/* 010 */   private Object[] values;
/* 011 */   private org.apache.spark.sql.types.StructType schema;
/* 012 */   
/* 013 */   
/* 014 */   public SpecificSafeProjection(Object[] references) {
/* 015 */     this.references = references;
/* 016 */     mutableRow = (MutableRow) references[references.length - 1];
/* 017 */     
/* 018 */     this.schema = (org.apache.spark.sql.types.StructType) references[0];
/* 019 */   }
/* 020 */   
/* 021 */   public java.lang.Object apply(java.lang.Object _i) {
/* 022 */     InternalRow i = (InternalRow) _i;
/* 023 */     /* createexternalrow(if (isnull(input[0, int])) null else input[0, int], StructField(value,IntegerType,false)) @ filter at <console... */
/* 024 */     values = new Object[1];
/* 025 */     /* if (isnull(input[0, int])) null else input[0, int] @ filter at <console>:27 */
/* 026 */     /* isnull(input[0, int]) @ filter at <console>:27 */
/* 027 */     /* input[0, int] @ filter at <console>:27 */
/* 028 */     int value3 = i.getInt(0);
/* 029 */     boolean isNull1 = false;
/* 030 */     int value1 = -1;
/* 031 */     if (!false && false) {
/* 032 */       /* null @ filter at <console>:27 */
/* 033 */       final int value4 = -1;
/* 034 */       isNull1 = true;
/* 035 */       value1 = value4;
/* 036 */     } else {
/* 037 */       /* input[0, int] @ filter at <console>:27 */
/* 038 */       int value5 = i.getInt(0);
/* 039 */       isNull1 = false;
/* 040 */       value1 = value5;
/* 041 */     }
/* 042 */     if (isNull1) {
/* 043 */       values[0] = null;
/* 044 */     } else {
/* 045 */       values[0] = value1;
/* 046 */     }
/* 047 */     
/* 048 */     final org.apache.spark.sql.Row value = new org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema(values, this.schema);
/* 049 */     if (false) {
/* 050 */       mutableRow.setNullAt(0);
/* 051 */     } else {
/* 052 */       
/* 053 */       mutableRow.update(0, value);
/* 054 */     }
/* 055 */     
/* 056 */     return mutableRow;
/* 057 */   }
/* 058 */ }
/* 059 */ 

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, we can use withOrigin to set CurrentOrigin temporarily.
Also, we may need to create CurrentOrigin for each expression because in TreeNode, origin is set by CurrentOrigin.get.
Otherwise, some non-related expressions share a same origin.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current version does not use CurrentOrigin.set to update TreeNode.origin. CurrentOrigin.set at https://github.com/apache/spark/pull/11301/files#diff-ac415c903887e49486ba542a65eec980R49 update TreeNode.origin for logicalPlan.

For the first problem, I am afraid that two queries filter and orderBy share one unique Expression for $"value". If one Expression is shared, we have to keep origin for a expression tree for each query and set it into Expression in the tree at code generation.

For the second problem, I expected the same behavior. I will investigate it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I was too busy, I did not have time to work for this PR. Sorry.

My current idea is to set CurrentOrigin at SQLExecution.withNewExecutionId, and then TreeNode.toOriginString dumps CurrentOrigin.get.callSite.

My next step seems to pass callSite from an operation such as filter to DataSet to LogicalPlan that can be accessed at SQLExecution.withNewExecutionId.

}
}

/**
Expand Down
18 changes: 17 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,7 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame = {
Column.updateExpressionsOrigin(joinExprs)
// Note that in this function, we introduce a hack in the case of self-join to automatically
// resolve ambiguous join conditions into ones that might make sense [SPARK-6231].
// Consider this case: df.join(df, df("key") === df("key"))
Expand Down Expand Up @@ -967,6 +968,7 @@ class Dataset[T] private[sql](
*/
@scala.annotation.varargs
def select(cols: Column*): DataFrame = withPlan {
Column.updateExpressionsOrigin(cols : _*)
Project(cols.map(_.named), logicalPlan)
}

Expand Down Expand Up @@ -1111,6 +1113,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def filter(condition: Column): Dataset[T] = withTypedPlan {
Column.updateExpressionsOrigin(condition)
Filter(condition.expr, logicalPlan)
}

Expand Down Expand Up @@ -1173,6 +1176,7 @@ class Dataset[T] private[sql](
*/
@scala.annotation.varargs
def groupBy(cols: Column*): RelationalGroupedDataset = {
Column.updateExpressionsOrigin(cols : _*)
RelationalGroupedDataset(toDF(), cols.map(_.expr), RelationalGroupedDataset.GroupByType)
}

Expand All @@ -1197,6 +1201,7 @@ class Dataset[T] private[sql](
*/
@scala.annotation.varargs
def rollup(cols: Column*): RelationalGroupedDataset = {
Column.updateExpressionsOrigin(cols : _*)
RelationalGroupedDataset(toDF(), cols.map(_.expr), RelationalGroupedDataset.RollupType)
}

Expand All @@ -1221,6 +1226,7 @@ class Dataset[T] private[sql](
*/
@scala.annotation.varargs
def cube(cols: Column*): RelationalGroupedDataset = {
Column.updateExpressionsOrigin(cols : _*)
RelationalGroupedDataset(toDF(), cols.map(_.expr), RelationalGroupedDataset.CubeType)
}

Expand Down Expand Up @@ -1419,7 +1425,10 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
@scala.annotation.varargs
def agg(expr: Column, exprs: Column*): DataFrame = groupBy().agg(expr, exprs : _*)
def agg(expr: Column, exprs: Column*): DataFrame = {
Column.updateExpressionsOrigin(exprs : _*)
groupBy().agg(expr, exprs : _*)
}

/**
* Returns a new [[Dataset]] by taking the first `n` rows. The difference between this function
Expand Down Expand Up @@ -1608,6 +1617,7 @@ class Dataset[T] private[sql](
*/
@deprecated("use flatMap() or select() with functions.explode() instead", "2.0.0")
def explode[A <: Product : TypeTag](input: Column*)(f: Row => TraversableOnce[A]): DataFrame = {
Column.updateExpressionsOrigin(input : _*)
val elementSchema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]

val convert = CatalystTypeConverters.createToCatalystConverter(elementSchema)
Expand Down Expand Up @@ -1671,6 +1681,7 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
def withColumn(colName: String, col: Column): DataFrame = {
Column.updateExpressionsOrigin(col)
val resolver = sparkSession.sessionState.analyzer.resolver
val output = queryExecution.analyzed.output
val shouldReplace = output.exists(f => resolver(f.name, colName))
Expand All @@ -1692,6 +1703,7 @@ class Dataset[T] private[sql](
* Returns a new [[Dataset]] by adding a column with metadata.
*/
private[spark] def withColumn(colName: String, col: Column, metadata: Metadata): DataFrame = {
Column.updateExpressionsOrigin(col)
val resolver = sparkSession.sessionState.analyzer.resolver
val output = queryExecution.analyzed.output
val shouldReplace = output.exists(f => resolver(f.name, colName))
Expand Down Expand Up @@ -1782,6 +1794,7 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
def drop(col: Column): DataFrame = {
Column.updateExpressionsOrigin(col)
val expression = col match {
case Column(u: UnresolvedAttribute) =>
queryExecution.analyzed.resolveQuoted(
Expand Down Expand Up @@ -2218,6 +2231,7 @@ class Dataset[T] private[sql](
*/
@scala.annotation.varargs
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = withTypedPlan {
Column.updateExpressionsOrigin(partitionExprs : _*)
RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, Some(numPartitions))
}

Expand All @@ -2233,6 +2247,7 @@ class Dataset[T] private[sql](
*/
@scala.annotation.varargs
def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan {
Column.updateExpressionsOrigin(partitionExprs : _*)
RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, numPartitions = None)
}

Expand Down Expand Up @@ -2528,6 +2543,7 @@ class Dataset[T] private[sql](
}

private def sortInternal(global: Boolean, sortExprs: Seq[Column]): Dataset[T] = {
Column.updateExpressionsOrigin(sortExprs : _*)
val sortOrder: Seq[SortOrder] = sortExprs.map { col =>
col.expr match {
case expr: SortOrder =>
Expand Down
Loading