Skip to content

[SPARK-17230] [SQL] Should not pass optimized query into QueryExecution in DataFrameWriter #14797

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.types._
*/
trait RunnableCommand extends LogicalPlan with logical.Command {
override def output: Seq[Attribute] = Seq.empty
override def children: Seq[LogicalPlan] = Seq.empty
final override def children: Seq[LogicalPlan] = Seq.empty
def run(sparkSession: SparkSession): Seq[Row]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ case class CreateDataSourceTableAsSelectCommand(
query: LogicalPlan)
extends RunnableCommand {

override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
override protected def innerChildren: Seq[LogicalPlan] = Seq(query)

override def run(sparkSession: SparkSession): Seq[Row] = {
val tableName = tableIdent.unquotedString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
Expand Down Expand Up @@ -479,13 +480,23 @@ case class DataSource(
}
}

// SPARK-17230: Resolve the partition columns so InsertIntoHadoopFsRelationCommand does
// not need to have the query as child, to avoid to analyze an optimized query,
// because InsertIntoHadoopFsRelationCommand will be optimized first.
val columns = partitionColumns.map { name =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be done before
val partitionSchema = ...
above ? Looks like we're trying to analyze the columns there as well...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only for write(), it does not have val partitionSchema = (others have).

val plan = data.logicalPlan
plan.resolve(name :: Nil, data.sparkSession.sessionState.analyzer.resolver).getOrElse {
throw new AnalysisException(
s"Unable to resolve ${name} given [${plan.output.map(_.name).mkString(", ")}]")
}.asInstanceOf[Attribute]
}
// For partitioned relation r, r.schema's column ordering can be different from the column
// ordering of data.logicalPlan (partition columns are all moved after data column). This
// will be adjusted within InsertIntoHadoopFsRelation.
val plan =
InsertIntoHadoopFsRelationCommand(
outputPath,
partitionColumns.map(UnresolvedAttribute.quoted),
columns,
bucketSpec,
format,
() => Unit, // No existing table needs to be refreshed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {

InsertIntoHadoopFsRelationCommand(
outputPath,
t.partitionSchema.fields.map(_.name).map(UnresolvedAttribute(_)),
query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver),
t.bucketSpec,
t.fileFormat,
() => t.refresh(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ case class InsertIntoHadoopFsRelationCommand(
mode: SaveMode)
extends RunnableCommand {

override def children: Seq[LogicalPlan] = query :: Nil
override protected def innerChildren: Seq[LogicalPlan] = query :: Nil

override def run(sparkSession: SparkSession): Seq[Row] = {
// Most formats don't do well with duplicate columns, so lets not allow that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,14 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
}
}

test("SPARK-17230: write out results of decimal calculation") {
val df = spark.range(99, 101)
.selectExpr("id", "cast(id as long) * cast('1.0' as decimal(38, 18)) as num")
df.write.mode(SaveMode.Overwrite).parquet(dir)
val df2 = spark.read.parquet(dir)
checkAnswer(df2, df)
}

private def testRead(
df: => DataFrame,
expectedResult: Seq[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class CreateHiveTableAsSelectCommand(

private val tableIdentifier = tableDesc.identifier

override def children: Seq[LogicalPlan] = Seq(query)
override def innerChildren: Seq[LogicalPlan] = Seq(query)

override def run(sparkSession: SparkSession): Seq[Row] = {
lazy val metastoreRelation: MetastoreRelation = {
Expand Down