Skip to content

Commit b932e86

Browse files
committed
Added eager analysis for error reporting.
1 parent e6f00b8 commit b932e86

File tree

4 files changed

+33
-8
lines changed

4 files changed

+33
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.sql.types.StructType
3131

3232
private[sql] object DataFrame {
3333
def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {
34-
new DataFrameImpl(sqlContext, logicalPlan)
34+
new DataFrameImpl(sqlContext, sqlContext.executePlan(logicalPlan))
3535
}
3636
}
3737

sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,22 +41,28 @@ import org.apache.spark.util.Utils
4141

4242

4343
/**
44-
* Implementation for [[DataFrame]]. Refer to [[DataFrame]] directly for documentation.
44+
* See [[DataFrame]] for documentation.
4545
*/
46-
class DataFrameImpl protected[sql](
46+
private[sql] class DataFrameImpl protected[sql](
4747
override val sqlContext: SQLContext,
48-
private val baseLogicalPlan: LogicalPlan)
48+
val queryExecution: SQLContext#QueryExecution)
4949
extends DataFrame {
5050

51-
@transient override lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)
51+
def this(sqlContext: SQLContext, logicalPlan: LogicalPlan) = {
52+
this(sqlContext, {
53+
val qe = sqlContext.executePlan(logicalPlan)
54+
qe.analyzed // This should force analysis and throw errors if there are any
55+
qe
56+
})
57+
}
5258

53-
@transient protected[sql] override val logicalPlan: LogicalPlan = baseLogicalPlan match {
59+
@transient protected[sql] override val logicalPlan: LogicalPlan = queryExecution.logical match {
5460
// For various commands (like DDL) and queries with side effects, we force query optimization to
5561
// happen right away to let these side effects take place eagerly.
5662
case _: Command | _: InsertIntoTable | _: CreateTableAsSelect[_] |_: WriteToFile =>
5763
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
5864
case _ =>
59-
baseLogicalPlan
65+
queryExecution.logical
6066
}
6167

6268
/**

sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,11 @@ class ColumnExpressionSuite extends QueryTest {
2929

3030
test("computability check") {
3131
def shouldBeComputable(c: Column): Unit = assert(c.isComputable === true)
32-
def shouldNotBeComputable(c: Column): Unit = assert(c.isComputable === false)
32+
33+
def shouldNotBeComputable(c: Column): Unit = {
34+
assert(c.isComputable === false)
35+
intercept[UnsupportedOperationException] { c.head() }
36+
}
3337

3438
shouldBeComputable(testData2("a"))
3539
shouldBeComputable(testData2("b"))

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,21 @@ import scala.language.postfixOps
2828
class DataFrameSuite extends QueryTest {
2929
import org.apache.spark.sql.TestData._
3030

31+
test("analysis error should be eagerly reported") {
32+
intercept[Exception] { testData.select('nonExistentName) }
33+
intercept[Exception] {
34+
testData.groupBy('key).agg(Map("nonExistentName" -> "sum"))
35+
}
36+
intercept[Exception] {
37+
testData.groupBy("nonExistentName").agg(Map("key" -> "sum"))
38+
}
39+
40+
// Uncomment the following once we report the errors properly.
41+
// intercept[Exception] {
42+
// testData.groupBy("nonExistentName").agg(Map("key" -> "sum"))
43+
// }
44+
}
45+
3146
test("table scan") {
3247
checkAnswer(
3348
testData,

0 commit comments

Comments
 (0)