From d59f6a709586ff0d1bfbfda50c4e4cf17d5a50ff Mon Sep 17 00:00:00 2001 From: ulysses Date: Fri, 30 Oct 2020 08:18:10 +0000 Subject: [PATCH] [SPARK-33294][SQL] Add query resolved check before analyze InsertIntoDir ### What changes were proposed in this pull request? Add `query.resolved` before analyze `InsertIntoDir`. ### Why are the changes needed? For better error msg. ``` INSERT OVERWRITE DIRECTORY '/tmp/file' USING PARQUET SELECT * FROM ( SELECT c3 FROM ( SELECT c1, c2 from values(1,2) t(c1, c2) ) ) ``` Before this PR, we get such error msg ``` org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to toAttribute on unresolved object, tree: * at org.apache.spark.sql.catalyst.analysis.Star.toAttribute(unresolved.scala:244) at org.apache.spark.sql.catalyst.plans.logical.Project$$anonfun$output$1.apply(basicLogicalOperators.scala:52) at org.apache.spark.sql.catalyst.plans.logical.Project$$anonfun$output$1.apply(basicLogicalOperators.scala:52) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:392) ``` ### Does this PR introduce _any_ user-facing change? Yes, error msg changed. ### How was this patch tested? New test. Closes #30197 from ulysses-you/SPARK-33294. Authored-by: ulysses Signed-off-by: Wenchen Fan --- .../datasources/DataSourceStrategy.scala | 4 ++-- .../apache/spark/sql/sources/InsertSuite.scala | 17 +++++++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 02dd4e549f93b..b1600a639a9bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -163,8 +163,8 @@ object DataSourceAnalysis extends Rule[LogicalPlan] with CastSupport { InsertIntoDataSourceCommand(l, query, overwrite) case InsertIntoDir(_, storage, provider, query, overwrite) - if provider.isDefined && provider.get.toLowerCase(Locale.ROOT) != DDLUtils.HIVE_PROVIDER => - + if query.resolved && provider.isDefined && + provider.get.toLowerCase(Locale.ROOT) != DDLUtils.HIVE_PROVIDER => val outputPath = new Path(storage.locationUri.get) if (overwrite) DDLUtils.verifyNotReadPath(query, outputPath) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 9b5466e8a68f1..4686a0c69de63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -896,6 +896,23 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2) SELECT 1 ,'' AS part2") } } + + test("SPARK-33294: Add query resolved check before analyze InsertIntoDir") { + withTempPath { path => + val msg = intercept[AnalysisException] { + sql( + s""" + |INSERT OVERWRITE DIRECTORY '${path.getAbsolutePath}' USING PARQUET + |SELECT * FROM ( + | SELECT c3 FROM ( + | SELECT c1, c2 from values(1,2) t(c1, c2) + | ) + |) + """.stripMargin) + }.getMessage + assert(msg.contains("cannot resolve '`c3`' given input columns")) + } + } } class FileExistingTestFileSystem extends RawLocalFileSystem {