Skip to content

[SPARK-7591] [SQL] Partitioning support API tweaks #6150

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.sql.catalyst.ParserDialect
import org.apache.spark.sql.execution.{Filter, _}
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.json._
import org.apache.spark.sql.parquet.FSBasedParquetRelation
import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -610,7 +610,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
} else if (conf.parquetUseDataSourceApi) {
val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray
baseRelationToDataFrame(
new FSBasedParquetRelation(
new ParquetRelation2(
globbedPaths.map(_.toString), None, None, Map.empty[String, String])(this))
} else {
DataFrame(this, parquet.ParquetRelation(
Expand Down Expand Up @@ -989,7 +989,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def jdbc(url: String, table: String): DataFrame = {
jdbc(url, table, JDBCRelation.columnPartition(null), new Properties())
}

/**
* :: Experimental ::
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
Expand All @@ -1002,7 +1002,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
def jdbc(url: String, table: String, properties: Properties): DataFrame = {
jdbc(url, table, JDBCRelation.columnPartition(null), properties)
}

/**
* :: Experimental ::
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
Expand All @@ -1020,7 +1020,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
@Experimental
def jdbc(
url: String,
table: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
Expand Down Expand Up @@ -1056,7 +1056,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
val parts = JDBCRelation.columnPartition(partitioning)
jdbc(url, table, parts, properties)
}

/**
* :: Experimental ::
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
Expand Down Expand Up @@ -1093,7 +1093,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
jdbc(url, table, parts, properties)
}

private def jdbc(
url: String,
table: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,27 +41,23 @@ import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}

private[sql] class DefaultSource extends FSBasedRelationProvider {
private[sql] class DefaultSource extends HadoopFsRelationProvider {
override def createRelation(
sqlContext: SQLContext,
paths: Array[String],
schema: Option[StructType],
partitionColumns: Option[StructType],
parameters: Map[String, String]): FSBasedRelation = {
parameters: Map[String, String]): HadoopFsRelation = {
val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty))
new FSBasedParquetRelation(paths, schema, partitionSpec, parameters)(sqlContext)
new ParquetRelation2(paths, schema, partitionSpec, parameters)(sqlContext)
}
}

// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
private[sql] class ParquetOutputWriter extends OutputWriter {
private var recordWriter: RecordWriter[Void, Row] = _
private var taskAttemptContext: TaskAttemptContext = _

override def init(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): Unit = {
private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext)
extends OutputWriter {

private val recordWriter: RecordWriter[Void, Row] = {
val conf = context.getConfiguration
val outputFormat = {
// When appending new Parquet files to an existing Parquet file directory, to avoid
Expand All @@ -77,7 +73,7 @@ private[sql] class ParquetOutputWriter extends OutputWriter {
if (fs.exists(outputPath)) {
// Pattern used to match task ID in part file names, e.g.:
//
// part-r-00001.gz.part
// part-r-00001.gz.parquet
// ^~~~~
val partFilePattern = """part-.-(\d{1,}).*""".r

Expand All @@ -86,9 +82,8 @@ private[sql] class ParquetOutputWriter extends OutputWriter {
case name if name.startsWith("_") => 0
case name if name.startsWith(".") => 0
case name => sys.error(
s"""Trying to write Parquet files to directory $outputPath,
|but found items with illegal name "$name"
""".stripMargin.replace('\n', ' ').trim)
s"Trying to write Parquet files to directory $outputPath, " +
s"but found items with illegal name '$name'.")
}.reduceOption(_ max _).getOrElse(0)
} else {
0
Expand All @@ -111,37 +106,39 @@ private[sql] class ParquetOutputWriter extends OutputWriter {
}
}

recordWriter = outputFormat.getRecordWriter(context)
taskAttemptContext = context
outputFormat.getRecordWriter(context)
}

override def write(row: Row): Unit = recordWriter.write(null, row)

override def close(): Unit = recordWriter.close(taskAttemptContext)
override def close(): Unit = recordWriter.close(context)
}

private[sql] class FSBasedParquetRelation(
paths: Array[String],
private[sql] class ParquetRelation2(
override val paths: Array[String],
private val maybeDataSchema: Option[StructType],
private val maybePartitionSpec: Option[PartitionSpec],
parameters: Map[String, String])(
val sqlContext: SQLContext)
extends FSBasedRelation(paths, maybePartitionSpec)
extends HadoopFsRelation(maybePartitionSpec)
with Logging {

// Should we merge schemas from all Parquet part-files?
private val shouldMergeSchemas =
parameters.getOrElse(FSBasedParquetRelation.MERGE_SCHEMA, "true").toBoolean
parameters.getOrElse(ParquetRelation2.MERGE_SCHEMA, "true").toBoolean

private val maybeMetastoreSchema = parameters
.get(FSBasedParquetRelation.METASTORE_SCHEMA)
.get(ParquetRelation2.METASTORE_SCHEMA)
.map(DataType.fromJson(_).asInstanceOf[StructType])

private val metadataCache = new MetadataCache
metadataCache.refresh()
private lazy val metadataCache: MetadataCache = {
val meta = new MetadataCache
meta.refresh()
meta
}

override def equals(other: scala.Any): Boolean = other match {
case that: FSBasedParquetRelation =>
case that: ParquetRelation2 =>
val schemaEquality = if (shouldMergeSchemas) {
this.shouldMergeSchemas == that.shouldMergeSchemas
} else {
Expand Down Expand Up @@ -175,8 +172,6 @@ private[sql] class FSBasedParquetRelation(
}
}

override def outputWriterClass: Class[_ <: OutputWriter] = classOf[ParquetOutputWriter]

override def dataSchema: StructType = metadataCache.dataSchema

override private[sql] def refresh(): Unit = {
Expand All @@ -187,9 +182,12 @@ private[sql] class FSBasedParquetRelation(
// Parquet data source always uses Catalyst internal representations.
override val needConversion: Boolean = false

override val sizeInBytes = metadataCache.dataStatuses.map(_.getLen).sum
override def sizeInBytes: Long = metadataCache.dataStatuses.map(_.getLen).sum

override def userDefinedPartitionColumns: Option[StructType] =
maybePartitionSpec.map(_.partitionColumns)

override def prepareForWrite(job: Job): Unit = {
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
val conf = ContextUtil.getConfiguration(job)

val committerClass =
Expand Down Expand Up @@ -224,6 +222,13 @@ private[sql] class FSBasedParquetRelation(
.getOrElse(
sqlContext.conf.parquetCompressionCodec.toUpperCase,
CompressionCodecName.UNCOMPRESSED).name())

new OutputWriterFactory {
override def newInstance(
path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = {
new ParquetOutputWriter(path, context)
}
}
}

override def buildScan(
Expand Down Expand Up @@ -385,7 +390,7 @@ private[sql] class FSBasedParquetRelation(
// case insensitivity issue and possible schema mismatch (probably caused by schema
// evolution).
maybeMetastoreSchema
.map(FSBasedParquetRelation.mergeMetastoreParquetSchema(_, dataSchema0))
.map(ParquetRelation2.mergeMetastoreParquetSchema(_, dataSchema0))
.getOrElse(dataSchema0)
}
}
Expand Down Expand Up @@ -439,12 +444,12 @@ private[sql] class FSBasedParquetRelation(
"No schema defined, " +
s"and no Parquet data file or summary file found under ${paths.mkString(", ")}.")

FSBasedParquetRelation.readSchema(filesToTouch.map(footers.apply), sqlContext)
ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext)
}
}
}

private[sql] object FSBasedParquetRelation extends Logging {
private[sql] object ParquetRelation2 extends Logging {
// Whether we should merge schemas collected from all Parquet part-files.
private[sql] val MERGE_SCHEMA = "mergeSchema"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
(a, _) => t.buildScan(a)) :: Nil

// Scanning partitioned FSBasedRelation
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: FSBasedRelation))
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation))
if t.partitionSpec.partitionColumns.nonEmpty =>
val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray

Expand Down Expand Up @@ -87,7 +87,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
selectedPartitions) :: Nil

// Scanning non-partitioned FSBasedRelation
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: FSBasedRelation)) =>
case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) =>
val inputPaths = t.paths.map(new Path(_)).flatMap { path =>
val fs = path.getFileSystem(t.sqlContext.sparkContext.hadoopConfiguration)
val qualifiedPath = path.makeQualified(fs.getUri, fs.getWorkingDirectory)
Expand All @@ -111,10 +111,10 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil

case i @ logical.InsertIntoTable(
l @ LogicalRelation(t: FSBasedRelation), part, query, overwrite, false) if part.isEmpty =>
l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) if part.isEmpty =>
val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
execution.ExecutedCommand(
InsertIntoFSBasedRelation(t, query, Array.empty[String], mode)) :: Nil
InsertIntoHadoopFsRelation(t, query, Array.empty[String], mode)) :: Nil

case _ => Nil
}
Expand All @@ -126,7 +126,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
partitionColumns: StructType,
partitions: Array[Partition]) = {
val output = projections.map(_.toAttribute)
val relation = logicalRelation.relation.asInstanceOf[FSBasedRelation]
val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation]

// Builds RDD[Row]s for each selected partition.
val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ private[sql] case class Partition(values: Row, path: String)
private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition])

private[sql] object PartitioningUtils {
// This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't
// depend on Hive.
private[sql] val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the hive one reference this one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether this is a good idea, since we are trying to be Hive compatible... Definitions from official Hive code should be the "standard" here.


private[sql] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) {
require(columnNames.size == literals.size)
}
Expand Down
23 changes: 10 additions & 13 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ private[sql] case class InsertIntoDataSource(
}
}

private[sql] case class InsertIntoFSBasedRelation(
@transient relation: FSBasedRelation,
private[sql] case class InsertIntoHadoopFsRelation(
@transient relation: HadoopFsRelation,
@transient query: LogicalPlan,
partitionColumns: Array[String],
mode: SaveMode)
Expand Down Expand Up @@ -102,7 +102,7 @@ private[sql] case class InsertIntoFSBasedRelation(
insert(new DefaultWriterContainer(relation, job), df)
} else {
val writerContainer = new DynamicPartitionWriterContainer(
relation, job, partitionColumns, "__HIVE_DEFAULT_PARTITION__")
relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME)
insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns)
}
}
Expand Down Expand Up @@ -234,7 +234,7 @@ private[sql] case class InsertIntoFSBasedRelation(
}

private[sql] abstract class BaseWriterContainer(
@transient val relation: FSBasedRelation,
@transient val relation: HadoopFsRelation,
@transient job: Job)
extends SparkHadoopMapReduceUtil
with Logging
Expand All @@ -261,15 +261,15 @@ private[sql] abstract class BaseWriterContainer(

protected val dataSchema = relation.dataSchema

protected val outputWriterClass: Class[_ <: OutputWriter] = relation.outputWriterClass
protected var outputWriterFactory: OutputWriterFactory = _

private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _

def driverSideSetup(): Unit = {
setupIDs(0, 0, 0)
setupConf()
taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
relation.prepareForWrite(job)
outputWriterFactory = relation.prepareJobForWrite(job)
outputFormatClass = job.getOutputFormatClass
outputCommitter = newOutputCommitter(taskAttemptContext)
outputCommitter.setupJob(jobContext)
Expand Down Expand Up @@ -346,16 +346,15 @@ private[sql] abstract class BaseWriterContainer(
}

private[sql] class DefaultWriterContainer(
@transient relation: FSBasedRelation,
@transient relation: HadoopFsRelation,
@transient job: Job)
extends BaseWriterContainer(relation, job) {

@transient private var writer: OutputWriter = _

override protected def initWriters(): Unit = {
writer = outputWriterClass.newInstance()
taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath)
writer.init(getWorkPath, dataSchema, taskAttemptContext)
writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
}

override def outputWriterForRow(row: Row): OutputWriter = writer
Expand All @@ -372,7 +371,7 @@ private[sql] class DefaultWriterContainer(
}

private[sql] class DynamicPartitionWriterContainer(
@transient relation: FSBasedRelation,
@transient relation: HadoopFsRelation,
@transient job: Job,
partitionColumns: Array[String],
defaultPartitionName: String)
Expand All @@ -398,12 +397,10 @@ private[sql] class DynamicPartitionWriterContainer(

outputWriters.getOrElseUpdate(partitionPath, {
val path = new Path(getWorkPath, partitionPath)
val writer = outputWriterClass.newInstance()
taskAttemptContext.getConfiguration.set(
"spark.sql.sources.output.path",
new Path(outputPath, partitionPath).toString)
writer.init(path.toString, dataSchema, taskAttemptContext)
writer
outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
})
}

Expand Down
Loading