Skip to content

Commit

Permalink
Review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed Nov 19, 2014
1 parent abd8e2f commit 645768b
Showing 1 changed file with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import scala.collection.JavaConversions._
/**
* Allows creation of parquet based tables using the syntax
* `CREATE TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option required
* is `path`, which is should be the location of a collection of, optionally partitioned,
* is `path`, which should be the location of a collection of, optionally partitioned,
* parquet files.
*/
class DefaultSource extends RelationProvider {
Expand Down Expand Up @@ -68,7 +68,8 @@ private[parquet] case class Partition(partitionValues: Map[String, Any], files:
* located at `path`. Currently only a single partitioning column is supported and it must
* be an integer. This class supports both fully self-describing data, which contains the partition
* key, and data where the partition key is only present in the folder structure. The presence
* of the partitioning key in the data is also auto-detected.
* of the partitioning key in the data is also auto-detected. The `null` partition is not yet
* supported.
*
* Metadata: The metadata is automatically discovered by reading the first parquet file present.
* There is currently no support for working with files that have different schema. Additionally,
Expand Down Expand Up @@ -112,8 +113,12 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)

// Do a parallel lookup of partition metadata.
val partitionFiles =
childDirs.par
.map(d => fs.listStatus(d.getPath).filterNot(_.getPath.getName.startsWith("_"))).seq
childDirs.par.map { d =>
fs.listStatus(d.getPath)
// TODO: Is there a standard hadoop function for this?
.filterNot(_.getPath.getName.startsWith("_"))
.filterNot(_.getPath.getName.startsWith("."))
}.seq

partitionKeys = foundKeys.toSeq
partitions = partitionFiles.zip(partitionPairs).map { case (files, (key, value)) =>
Expand All @@ -127,7 +132,7 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)

override val sizeInBytes = partitions.flatMap(_.files).map(_.getLen).sum

val dataSchema = StructType.fromAttributes(// TODO: Parquet code should not deal with attributes.
val dataSchema = StructType.fromAttributes( // TODO: Parquet code should not deal with attributes.
ParquetTypesConverter.readSchemaFromFile(
partitions.head.files.head.getPath,
Some(sparkContext.hadoopConfiguration),
Expand All @@ -140,10 +145,7 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
if (dataIncludesKey) {
dataSchema
} else {
StructType(
StructField(partitionKeys.head, IntegerType) +:
dataSchema.fields
)
StructType(dataSchema.fields :+ StructField(partitionKeys.head, IntegerType))
}

override def buildScan(output: Seq[Attribute], predicates: Seq[Expression]): RDD[Row] = {
Expand Down

0 comments on commit 645768b

Please sign in to comment.