Skip to content

SPARK-1293 [SQL] Parquet support for nested types #360

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 48 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
aa688fe
Adding conversion of nested Parquet schemas
AndreSchumacher Mar 26, 2014
4d4892a
First commit nested Parquet read converters
AndreSchumacher Mar 27, 2014
6125c75
First working nested Parquet record input
AndreSchumacher Mar 27, 2014
745a42b
Completing testcase for nested data (Addressbook(
AndreSchumacher Apr 1, 2014
ddb40d2
Extending tests for nested Parquet data
AndreSchumacher Apr 1, 2014
1b1b3d6
Fixing one problem with nested arrays
AndreSchumacher Apr 2, 2014
5d80461
fixing one problem with nested structs and breaking up files
AndreSchumacher Apr 2, 2014
98219cf
added struct converter
AndreSchumacher Apr 2, 2014
ee70125
fixing one problem with arrayconverter
AndreSchumacher Apr 3, 2014
b7fcc35
Documenting conversions, bugfix, wrappers of Rows
AndreSchumacher Apr 4, 2014
6dbc9b7
Fixing some problems intruduced during rebase
AndreSchumacher Apr 6, 2014
f8f8911
For primitive rows fall back to more efficient converter, code reorg
AndreSchumacher Apr 6, 2014
4e25fcb
Adding resolution of complex ArrayTypes
AndreSchumacher Apr 8, 2014
a594aed
Scalastyle
AndreSchumacher Apr 8, 2014
b539fde
First commit for MapType
AndreSchumacher Apr 11, 2014
824500c
Adding attribute resolution for MapType
AndreSchumacher Apr 11, 2014
f777b4b
Scalastyle
AndreSchumacher Apr 11, 2014
d1911dc
Simplifying ArrayType conversion
AndreSchumacher Apr 12, 2014
1dc5ac9
First version of WriteSupport for nested types
AndreSchumacher Apr 12, 2014
e99cc51
Fixing nested WriteSupport and adding tests
AndreSchumacher Apr 13, 2014
adc1258
Optimizing imports
AndreSchumacher Apr 13, 2014
f466ff0
Added ParquetAvro tests and revised Array conversion
AndreSchumacher Apr 13, 2014
79d81d5
Replacing field names for array and map in WriteSupport
AndreSchumacher Apr 13, 2014
619c397
Completing Map testcase
AndreSchumacher Apr 14, 2014
c52ff2c
Adding native-array converter
AndreSchumacher Apr 19, 2014
431f00f
Fixing problems introduced during rebase
AndreSchumacher Apr 19, 2014
a6b4f05
Cleaning up ArrayConverter, moving classTag to NativeType, adding Nat…
AndreSchumacher Apr 20, 2014
0ae9376
Doc strings and simplifying ParquetConverter.scala
AndreSchumacher May 10, 2014
32229c7
Removing Row nested values and placing by generic types
AndreSchumacher May 11, 2014
cbb5793
Code review feedback
AndreSchumacher May 11, 2014
191bc0d
Changing to Seq for ArrayType, refactoring SQLParser for nested field…
AndreSchumacher May 24, 2014
2f5a805
Removing stripMargin from test schemas
AndreSchumacher May 24, 2014
de02538
Cleaning up ParquetTestData
AndreSchumacher May 24, 2014
31465d6
Scalastyle: fixing commented out bottom
AndreSchumacher May 24, 2014
3c6b25f
Trying to reduce no-op changes wrt master
AndreSchumacher Jun 1, 2014
3104886
Nested Rows should be Rows, not Seqs.
marmbrus Jun 3, 2014
f7aeba3
[SPARK-1982] Support for ByteType and ShortType.
marmbrus Jun 3, 2014
3e1456c
WIP: Directly serialize catalyst attributes.
marmbrus Jun 4, 2014
14c3fd8
Attempting to fix Spark-Parquet schema conversion
AndreSchumacher Jun 4, 2014
37e0a0a
Cleaning up
AndreSchumacher Jun 4, 2014
88e6bdb
Attempting to fix loss of schema
AndreSchumacher Jun 4, 2014
63d1b57
Cleaning up and Scalastyle
AndreSchumacher Jun 8, 2014
b8a8b9a
More fixes to short and byte conversion
AndreSchumacher Jun 8, 2014
403061f
Fixing some issues with tests and schema metadata
AndreSchumacher Jun 8, 2014
94eea3a
Scalastyle
AndreSchumacher Jun 8, 2014
7eceb67
Review feedback
AndreSchumacher Jun 19, 2014
95c1367
Changes to ParquetRelation and its metadata
AndreSchumacher Jun 19, 2014
30708c8
Taking out AvroParquet test for now to remove Avro dependency
AndreSchumacher Jun 20, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,43 +66,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected case class Keyword(str: String)

protected implicit def asParser(k: Keyword): Parser[String] =
allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)

protected class SqlLexical extends StdLexical {
case class FloatLit(chars: String) extends Token {
override def toString = chars
}
override lazy val token: Parser[Token] = (
identChar ~ rep( identChar | digit ) ^^
{ case first ~ rest => processIdent(first :: rest mkString "") }
| rep1(digit) ~ opt('.' ~> rep(digit)) ^^ {
case i ~ None => NumericLit(i mkString "")
case i ~ Some(d) => FloatLit(i.mkString("") + "." + d.mkString(""))
}
| '\'' ~ rep( chrExcept('\'', '\n', EofCh) ) ~ '\'' ^^
{ case '\'' ~ chars ~ '\'' => StringLit(chars mkString "") }
| '\"' ~ rep( chrExcept('\"', '\n', EofCh) ) ~ '\"' ^^
{ case '\"' ~ chars ~ '\"' => StringLit(chars mkString "") }
| EofCh ^^^ EOF
| '\'' ~> failure("unclosed string literal")
| '\"' ~> failure("unclosed string literal")
| delim
| failure("illegal character")
)

override def identChar = letter | elem('.') | elem('_')

override def whitespace: Parser[Any] = rep(
whitespaceChar
| '/' ~ '*' ~ comment
| '/' ~ '/' ~ rep( chrExcept(EofCh, '\n') )
| '#' ~ rep( chrExcept(EofCh, '\n') )
| '-' ~ '-' ~ rep( chrExcept(EofCh, '\n') )
| '/' ~ '*' ~ failure("unclosed comment")
)
}

override val lexical = new SqlLexical
lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)

protected val ALL = Keyword("ALL")
protected val AND = Keyword("AND")
Expand Down Expand Up @@ -161,24 +125,9 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
this.getClass
.getMethods
.filter(_.getReturnType == classOf[Keyword])
.map(_.invoke(this).asInstanceOf[Keyword])

/** Generate all variations of upper and lower case of a given string */
private def allCaseVersions(s: String, prefix: String = ""): Stream[String] = {
if (s == "") {
Stream(prefix)
} else {
allCaseVersions(s.tail, prefix + s.head.toLower) ++
allCaseVersions(s.tail, prefix + s.head.toUpper)
}
}
.map(_.invoke(this).asInstanceOf[Keyword].str)

lexical.reserved ++= reservedWords.flatMap(w => allCaseVersions(w.str))

lexical.delimiters += (
"@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
",", ";", "%", "{", "}", ":", "[", "]"
)
override val lexical = new SqlLexical(reservedWords)

protected def assignAliases(exprs: Seq[Expression]): Seq[NamedExpression] = {
exprs.zipWithIndex.map {
Expand Down Expand Up @@ -383,7 +332,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars)

protected lazy val baseExpression: PackratParser[Expression] =
expression ~ "[" ~ expression <~ "]" ^^ {
expression ~ "[" ~ expression <~ "]" ^^ {
case base ~ _ ~ ordinal => GetItem(base, ordinal)
} |
TRUE ^^^ Literal(true, BooleanType) |
Expand All @@ -399,3 +348,55 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected lazy val dataType: Parser[DataType] =
STRING ^^^ StringType
}

class SqlLexical(val keywords: Seq[String]) extends StdLexical {
case class FloatLit(chars: String) extends Token {
override def toString = chars
}

reserved ++= keywords.flatMap(w => allCaseVersions(w))

delimiters += (
"@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
",", ";", "%", "{", "}", ":", "[", "]"
)

override lazy val token: Parser[Token] = (
identChar ~ rep( identChar | digit ) ^^
{ case first ~ rest => processIdent(first :: rest mkString "") }
| rep1(digit) ~ opt('.' ~> rep(digit)) ^^ {
case i ~ None => NumericLit(i mkString "")
case i ~ Some(d) => FloatLit(i.mkString("") + "." + d.mkString(""))
}
| '\'' ~ rep( chrExcept('\'', '\n', EofCh) ) ~ '\'' ^^
{ case '\'' ~ chars ~ '\'' => StringLit(chars mkString "") }
| '\"' ~ rep( chrExcept('\"', '\n', EofCh) ) ~ '\"' ^^
{ case '\"' ~ chars ~ '\"' => StringLit(chars mkString "") }
| EofCh ^^^ EOF
| '\'' ~> failure("unclosed string literal")
| '\"' ~> failure("unclosed string literal")
| delim
| failure("illegal character")
)

override def identChar = letter | elem('_') | elem('.')

override def whitespace: Parser[Any] = rep(
whitespaceChar
| '/' ~ '*' ~ comment
| '/' ~ '/' ~ rep( chrExcept(EofCh, '\n') )
| '#' ~ rep( chrExcept(EofCh, '\n') )
| '-' ~ '-' ~ rep( chrExcept(EofCh, '\n') )
| '/' ~ '*' ~ failure("unclosed comment")
)

/** Generate all variations of upper and lower case of a given string */
def allCaseVersions(s: String, prefix: String = ""): Stream[String] = {
if (s == "") {
Stream(prefix)
} else {
allCaseVersions(s.tail, prefix + s.head.toLower) ++
allCaseVersions(s.tail, prefix + s.head.toUpper)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ case class GetItem(child: Expression, ordinal: Expression) extends Expression {
null
} else {
if (child.dataType.isInstanceOf[ArrayType]) {
// TODO: consider using Array[_] for ArrayType child to avoid
// boxing of primitives
val baseValue = value.asInstanceOf[Seq[_]]
val o = key.asInstanceOf[Int]
if (o >= baseValue.size || o < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,108 @@ package org.apache.spark.sql.catalyst.types

import java.sql.Timestamp

import scala.reflect.runtime.universe.{typeTag, TypeTag}
import scala.util.parsing.combinator.RegexParsers

import org.apache.spark.sql.catalyst.expressions.Expression
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.{typeTag, TypeTag, runtimeMirror}

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
import org.apache.spark.util.Utils

/**
*
*/
object DataType extends RegexParsers {
protected lazy val primitiveType: Parser[DataType] =
"StringType" ^^^ StringType |
"FloatType" ^^^ FloatType |
"IntegerType" ^^^ IntegerType |
"ByteType" ^^^ ByteType |
"ShortType" ^^^ ShortType |
"DoubleType" ^^^ DoubleType |
"LongType" ^^^ LongType |
"BinaryType" ^^^ BinaryType |
"BooleanType" ^^^ BooleanType |
"DecimalType" ^^^ DecimalType |
"TimestampType" ^^^ TimestampType

protected lazy val arrayType: Parser[DataType] =
"ArrayType" ~> "(" ~> dataType <~ ")" ^^ ArrayType

protected lazy val mapType: Parser[DataType] =
"MapType" ~> "(" ~> dataType ~ "," ~ dataType <~ ")" ^^ {
case t1 ~ _ ~ t2 => MapType(t1, t2)
}

protected lazy val structField: Parser[StructField] =
("StructField(" ~> "[a-zA-Z0-9_]*".r) ~ ("," ~> dataType) ~ ("," ~> boolVal <~ ")") ^^ {
case name ~ tpe ~ nullable =>
StructField(name, tpe, nullable = nullable)
}

protected lazy val boolVal: Parser[Boolean] =
"true" ^^^ true |
"false" ^^^ false


protected lazy val structType: Parser[DataType] =
"StructType\\([A-zA-z]*\\(".r ~> repsep(structField, ",") <~ "))" ^^ {
case fields => new StructType(fields)
}

protected lazy val dataType: Parser[DataType] =
arrayType |
mapType |
structType |
primitiveType

/**
* Parses a string representation of a DataType.
*
* TODO: Generate parser as pickler...
*/
def apply(asString: String): DataType = parseAll(dataType, asString) match {
case Success(result, _) => result
case failure: NoSuccess => sys.error(s"Unsupported dataType: $asString, $failure")
}
}

abstract class DataType {
/** Matches any expression that evaluates to this DataType */
def unapply(a: Expression): Boolean = a match {
case e: Expression if e.dataType == this => true
case _ => false
}

def isPrimitive: Boolean = false
}

case object NullType extends DataType

trait PrimitiveType extends DataType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the semantics of PrimitiveType? Specifically, I'm surprised that StringType and DecimalType are considered PrimitiveTypes. Also I wonder if we can unify this with NativeType somehow. I'm not really sure, but I'd like to avoid too much explosion here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marmbrus PrimitiveType is maybe a misnomer. It's the same term that Parquet uses. Basically a PrimitiveType is a type that is not contained inside another type (so non-nested). You can argue that a String is a Char array and therefore not primitive but in terms of constructing nested rows it means that a primitive type is a leaf inside the tree that produces a record.

It would help to somehow distinguish between nested and non-nested types. NativeType comes close but for example there is BinaryType which is primitive but not native.

override def isPrimitive = true
}

abstract class NativeType extends DataType {
type JvmType
@transient val tag: TypeTag[JvmType]
val ordering: Ordering[JvmType]

@transient val classTag = {
val mirror = runtimeMirror(Utils.getSparkClassLoader)
ClassTag[JvmType](mirror.runtimeClass(tag.tpe))
}
}

case object StringType extends NativeType {
case object StringType extends NativeType with PrimitiveType {
type JvmType = String
@transient lazy val tag = typeTag[JvmType]
val ordering = implicitly[Ordering[JvmType]]
}
case object BinaryType extends DataType {
case object BinaryType extends DataType with PrimitiveType {
type JvmType = Array[Byte]
}
case object BooleanType extends NativeType {
case object BooleanType extends NativeType with PrimitiveType {
type JvmType = Boolean
@transient lazy val tag = typeTag[JvmType]
val ordering = implicitly[Ordering[JvmType]]
Expand All @@ -63,7 +136,7 @@ case object TimestampType extends NativeType {
}
}

abstract class NumericType extends NativeType {
abstract class NumericType extends NativeType with PrimitiveType {
// Unfortunately we can't get this implicitly as that breaks Spark Serialization. In order for
// implicitly[Numeric[JvmType]] to be valid, we have to change JvmType from a type variable to a
// type parameter and and add a numeric annotation (i.e., [JvmType : Numeric]). This gets
Expand Down Expand Up @@ -154,6 +227,17 @@ case object FloatType extends FractionalType {
case class ArrayType(elementType: DataType) extends DataType

case class StructField(name: String, dataType: DataType, nullable: Boolean)
case class StructType(fields: Seq[StructField]) extends DataType

object StructType {
def fromAttributes(attributes: Seq[Attribute]): StructType = {
StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable)))
}

// def apply(fields: Seq[StructField]) = new StructType(fields.toIndexedSeq)
}

case class StructType(fields: Seq[StructField]) extends DataType {
def toAttributes = fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)())
}

case class MapType(keyType: DataType, valueType: DataType) extends DataType
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @group userf
*/
def parquetFile(path: String): SchemaRDD =
new SchemaRDD(this, parquet.ParquetRelation(path))
new SchemaRDD(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration)))

/**
* Loads a JSON file (one object per line), returning the result as a [[SchemaRDD]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ class JavaSQLContext(val sqlContext: SQLContext) {
* Loads a parquet file, returning the result as a [[JavaSchemaRDD]].
*/
def parquetFile(path: String): JavaSchemaRDD =
new JavaSchemaRDD(sqlContext, ParquetRelation(path))
new JavaSchemaRDD(
sqlContext,
ParquetRelation(path, Some(sqlContext.sparkContext.hadoopConfiguration)))

/**
* Loads a JSON file (one object per line), returning the result as a [[JavaSchemaRDD]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.WriteToFile(path, child) =>
val relation =
ParquetRelation.create(path, child, sparkContext.hadoopConfiguration)
InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil
// Note: overwrite=false because otherwise the metadata we just created will be deleted
InsertIntoParquetTable(relation, planLater(child), overwrite=false)(sparkContext) :: Nil
case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
Expand Down
Loading