Skip to content

[SPARK-25421][SQL] Abstract an output path field in trait DataWritingCommand #22411

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@

package org.apache.spark.sql.execution

import org.apache.hadoop.fs.Path

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, DataWritingCommand, DataWritingCommandExec}
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetricInfo
import org.apache.spark.util.Utils

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -57,12 +63,58 @@ private[execution] object SparkPlanInfo {
new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType)
}

def makeOutputMetadata(
path: Option[Path],
outputColumnNames: Seq[String]): Map[String, String] = {
val pathString = path match {
case Some(p) if p != null => p.toString
case _ => ""
}
Map("OutputPath" -> pathString,
"OutputColumnNames" -> outputColumnNames.mkString("[", ", ", "]")
)
}

def reflectTable(write: DataWritingCommand, className: String, field: String): CatalogTable = {
val tableField = Utils.classForName(className).getDeclaredField(field)
tableField.setAccessible(true)
tableField.get(write).asInstanceOf[CatalogTable]
}

// dump the file scan metadata (e.g file path) to event log
val metadata = plan match {
case fileScan: FileSourceScanExec => fileScan.metadata
case DataWritingCommandExec(i: InsertIntoHadoopFsRelationCommand, _) =>
makeOutputMetadata(Some(i.outputPath), i.outputColumnNames)
case DataWritingCommandExec(d: DataWritingCommand, _) =>
d.getClass.getCanonicalName match {
case CREATE_DATA_SOURCE_TABLE_AS_SELECT_COMMAND =>
val table = reflectTable(d, CREATE_DATA_SOURCE_TABLE_AS_SELECT_COMMAND, "table")
makeOutputMetadata(table.storage.locationUri.map(new Path(_)), d.outputColumnNames)
case CREATE_HIVE_TABLE_AS_SELECT_COMMAND =>
val table = reflectTable(d, CREATE_HIVE_TABLE_AS_SELECT_COMMAND, "tableDesc")
makeOutputMetadata(table.storage.locationUri.map(new Path(_)), d.outputColumnNames)
case INSERT_INTO_HIVE_DIR_COMMAND =>
val table = reflectTable(d, INSERT_INTO_HIVE_DIR_COMMAND, "table")
makeOutputMetadata(table.storage.locationUri.map(new Path(_)), d.outputColumnNames)
case INSERT_INTO_HIVE_TABLE =>
val table = reflectTable(d, INSERT_INTO_HIVE_TABLE, "table")
makeOutputMetadata(table.storage.locationUri.map(new Path(_)), d.outputColumnNames)
case _ => Map[String, String]()
}
case _ => Map[String, String]()
}

new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan),
metadata, metrics)
}

private val CREATE_DATA_SOURCE_TABLE_AS_SELECT_COMMAND =
classOf[CreateDataSourceTableAsSelectCommand].getCanonicalName
private val CREATE_HIVE_TABLE_AS_SELECT_COMMAND =
"org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand"
private val INSERT_INTO_HIVE_DIR_COMMAND =
"org.apache.spark.sql.hive.execution.InsertIntoHiveDirCommand"
private val INSERT_INTO_HIVE_TABLE =
"org.apache.spark.sql.hive.execution.InsertIntoHiveTable"
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.net.URI

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,13 @@ class SparkPlanSuite extends QueryTest with SharedSQLContext {
assert(SparkPlanInfo.fromSparkPlan(f.queryExecution.sparkPlan).metadata.nonEmpty)
}
}

test("SPARK-25421 DataWritingCommandExec should contains 'OutputPath' metadata") {
withTable("t") {
sql("CREATE TABLE t(col_I int) USING PARQUET")
val f = sql("INSERT OVERWRITE TABLE t SELECT 1")
assert(SparkPlanInfo.fromSparkPlan(f.queryExecution.sparkPlan).metadata
.contains("OutputPath"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import scala.util.control.NonFatal

import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.DataWritingCommand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.hive.client.HiveClientImpl
Expand Down