Skip to content

Commit aa688fe

Browse files
Adding conversion of nested Parquet schemas
1 parent 67fca18 commit aa688fe

File tree

3 files changed

+196
-27
lines changed

3 files changed

+196
-27
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala

Lines changed: 66 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.parquet
1919

20-
import java.io.IOException
20+
import java.io.IOException,
2121

2222
import org.apache.hadoop.conf.Configuration
2323
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -26,9 +26,10 @@ import org.apache.hadoop.mapreduce.Job
2626

2727
import parquet.hadoop.util.ContextUtil
2828
import parquet.hadoop.{ParquetOutputFormat, Footer, ParquetFileWriter, ParquetFileReader}
29+
2930
import parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata}
3031
import parquet.io.api.{Binary, RecordConsumer}
31-
import parquet.schema.{Type => ParquetType, PrimitiveType => ParquetPrimitiveType, MessageType, MessageTypeParser}
32+
import parquet.schema.{Type => ParquetType, PrimitiveType => ParquetPrimitiveType, MessageType, MessageTypeParser, GroupType => ParquetGroupType, OriginalType => ParquetOriginalType, ConversionPatterns}
3233
import parquet.schema.PrimitiveType.{PrimitiveTypeName => ParquetPrimitiveTypeName}
3334
import parquet.schema.Type.Repetition
3435

@@ -172,7 +173,7 @@ private[sql] object ParquetRelation {
172173
}
173174

174175
private[parquet] object ParquetTypesConverter {
175-
def toDataType(parquetType : ParquetPrimitiveTypeName): DataType = parquetType match {
176+
def toPrimitiveDataType(parquetType : ParquetPrimitiveTypeName): DataType = parquetType match {
176177
// for now map binary to string type
177178
// TODO: figure out how Parquet uses strings or why we can't use them in a MessageType schema
178179
case ParquetPrimitiveTypeName.BINARY => StringType
@@ -190,15 +191,61 @@ private[parquet] object ParquetTypesConverter {
190191
s"Unsupported parquet datatype $parquetType")
191192
}
192193

193-
def fromDataType(ctype: DataType): ParquetPrimitiveTypeName = ctype match {
194-
case StringType => ParquetPrimitiveTypeName.BINARY
195-
case BooleanType => ParquetPrimitiveTypeName.BOOLEAN
196-
case DoubleType => ParquetPrimitiveTypeName.DOUBLE
197-
case ArrayType(ByteType) => ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
198-
case FloatType => ParquetPrimitiveTypeName.FLOAT
199-
case IntegerType => ParquetPrimitiveTypeName.INT32
200-
case LongType => ParquetPrimitiveTypeName.INT64
201-
case _ => sys.error(s"Unsupported datatype $ctype")
194+
def toDataType(parquetType: ParquetType): DataType = {
195+
if (parquetType.isPrimitive) toPrimitiveDataType(parquetType.asPrimitiveType.getPrimitiveTypeName)
196+
else {
197+
val groupType = parquetType.asGroupType()
198+
parquetType.getOriginalType match {
199+
case ParquetOriginalType.LIST | ParquetOriginalType.ENUM => {
200+
val fields = groupType.getFields.map(toDataType(_))
201+
new ArrayType(fields.apply(0)) // array fields should have the same type
202+
}
203+
case _ => { // everything else nested becomes a Struct
204+
val fields = groupType
205+
.getFields
206+
.map(ptype => new StructField(
207+
ptype.getName,
208+
toDataType(ptype),
209+
ptype.getRepetition != Repetition.REQUIRED))
210+
new StructType(fields)
211+
}
212+
}
213+
}
214+
}
215+
216+
def fromPrimitiveDataType(ctype: DataType): Option[ParquetPrimitiveTypeName] = ctype match {
217+
case StringType => Some(ParquetPrimitiveTypeName.BINARY)
218+
case BooleanType => Some(ParquetPrimitiveTypeName.BOOLEAN)
219+
case DoubleType => Some(ParquetPrimitiveTypeName.DOUBLE)
220+
case ArrayType(ByteType) => Some(ParquetPrimitiveTypeName.FIXED_LEN_BYTE_ARRAY)
221+
case FloatType => Some(ParquetPrimitiveTypeName.FLOAT)
222+
case IntegerType => Some(ParquetPrimitiveTypeName.INT32)
223+
case LongType => Some(ParquetPrimitiveTypeName.INT64)
224+
case _ => None
225+
}
226+
227+
def fromComplexDataType(ctype: DataType, name: String, nullable: Boolean = true): ParquetType = {
228+
val repetition =
229+
if (nullable) Repetition.OPTIONAL
230+
else Repetition.REQUIRED
231+
val primitiveType = fromPrimitiveDataType(ctype)
232+
if (primitiveType.isDefined) {
233+
new ParquetPrimitiveType(repetition, primitiveType.get, name)
234+
} else {
235+
ctype match {
236+
case ArrayType(elementType: DataType) => {
237+
val parquetElementType = fromComplexDataType(elementType, name + "_values", false)
238+
new ParquetGroupType(repetition, name, parquetElementType)
239+
}
240+
case StructType(structFields) => {
241+
val fields = structFields.map {
242+
field => fromComplexDataType(field.dataType, field.name, false)
243+
}
244+
new ParquetGroupType(repetition, name, fields)
245+
}
246+
case _ => sys.error(s"Unsupported datatype $ctype")
247+
}
248+
}
202249
}
203250

204251
def consumeType(consumer: RecordConsumer, ctype: DataType, record: Row, index: Int): Unit = {
@@ -217,23 +264,18 @@ private[parquet] object ParquetTypesConverter {
217264
}
218265
}
219266

220-
def getSchema(schemaString : String) : MessageType =
267+
def getSchema(schemaString: String) : MessageType =
221268
MessageTypeParser.parseMessageType(schemaString)
222269

223-
def convertToAttributes(parquetSchema: MessageType) : Seq[Attribute] = {
224-
parquetSchema.getColumns.map {
225-
case (desc) =>
226-
val ctype = toDataType(desc.getType)
227-
val name: String = desc.getPath.mkString(".")
228-
new AttributeReference(name, ctype, false)()
229-
}
270+
def convertToAttributes(parquetSchema: ParquetType): Seq[Attribute] = {
271+
parquetSchema
272+
.asGroupType()
273+
.getFields
274+
.map(field => new AttributeReference(field.getName, toDataType(field), field.getRepetition != Repetition.REQUIRED)())
230275
}
231276

232-
// TODO: allow nesting?
233277
def convertFromAttributes(attributes: Seq[Attribute]): MessageType = {
234-
val fields: Seq[ParquetType] = attributes.map {
235-
a => new ParquetPrimitiveType(Repetition.OPTIONAL, fromDataType(a.dataType), a.name)
236-
}
278+
val fields = attributes.map(attribute => fromComplexDataType(attribute.dataType, attribute.name, attribute.nullable))
237279
new MessageType("root", fields)
238280
}
239281

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala

Lines changed: 93 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,19 @@
1717

1818
package org.apache.spark.sql.parquet
1919

20+
import java.io.File
21+
2022
import org.apache.hadoop.conf.Configuration
21-
import org.apache.hadoop.fs.Path
23+
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
24+
import org.apache.hadoop.mapreduce.Job
2225

2326
import parquet.example.data.{GroupWriter, Group}
2427
import parquet.example.data.simple.SimpleGroup
25-
import parquet.hadoop.ParquetWriter
28+
import parquet.hadoop.{ParquetReader, ParquetFileReader, ParquetWriter}
2629
import parquet.hadoop.api.WriteSupport
2730
import parquet.hadoop.api.WriteSupport.WriteContext
31+
import parquet.hadoop.example.GroupReadSupport
32+
import parquet.hadoop.util.ContextUtil
2833
import parquet.io.api.RecordConsumer
2934
import parquet.schema.{MessageType, MessageTypeParser}
3035

@@ -100,9 +105,39 @@ private[sql] object ParquetTestData {
100105

101106
lazy val testData = new ParquetRelation(testDir.toURI.toString)
102107

108+
val testNestedSchema1 =
109+
// from blogpost example, source:
110+
// https://blog.twitter.com/2013/dremel-made-simple-with-parquet
111+
// note: instead of string we have to use binary (?) otherwise
112+
// Parquet gives us:
113+
// IllegalArgumentException: expected one of [INT64, INT32, BOOLEAN,
114+
// BINARY, FLOAT, DOUBLE, INT96, FIXED_LEN_BYTE_ARRAY]
115+
"""
116+
|message AddressBook {
117+
|required binary owner;
118+
|repeated binary ownerPhoneNumbers;
119+
|repeated group contacts {
120+
|required binary name;
121+
|optional binary phoneNumber;
122+
|}
123+
|}
124+
""".stripMargin
125+
126+
val testNestedDir1 = Utils.createTempDir()
127+
128+
lazy val testNestedData1 = new ParquetRelation(testNestedDir1.toURI.toString)
129+
130+
// Implicit
131+
// TODO: get rid of this since it is confusing!
132+
implicit def makePath(dir: File): Path = {
133+
new Path(new Path(dir.toURI), new Path("part-r-0.parquet"))
134+
}
135+
103136
def writeFile() = {
104-
testDir.delete
137+
testDir.delete()
105138
val path: Path = new Path(new Path(testDir.toURI), new Path("part-r-0.parquet"))
139+
val job = new Job()
140+
val configuration: Configuration = ContextUtil.getConfiguration(job)
106141
val schema: MessageType = MessageTypeParser.parseMessageType(testSchema)
107142
val writeSupport = new TestGroupWriteSupport(schema)
108143
val writer = new ParquetWriter[Group](path, writeSupport)
@@ -150,5 +185,60 @@ private[sql] object ParquetTestData {
150185
}
151186
writer.close()
152187
}
188+
189+
def writeNestedFile1() {
190+
// example data from https://blog.twitter.com/2013/dremel-made-simple-with-parquet
191+
testNestedDir1.delete()
192+
val path: Path = testNestedDir1
193+
val schema: MessageType = MessageTypeParser.parseMessageType(testNestedSchema1)
194+
195+
val r1 = new SimpleGroup(schema)
196+
r1.add(0, "Julien Le Dem")
197+
r1.add(1, "555 123 4567")
198+
r1.add(1, "555 666 1337")
199+
r1.addGroup(2)
200+
.append("name", "Dmitriy Ryaboy")
201+
.append("phoneNumber", "555 987 6543")
202+
r1.addGroup(2)
203+
.append("name", "Chris Aniszczyk")
204+
205+
val r2 = new SimpleGroup(schema)
206+
r2.add(0, "A. Nonymous")
207+
208+
// ParquetWriter initializes GroupWriteSupport with an empty configuration
209+
// (it is after all not intended to be used in this way?)
210+
// and members are private so we need to make our own
211+
val writeSupport = new WriteSupport[Group] {
212+
var groupWriter: GroupWriter = null
213+
override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
214+
groupWriter = new GroupWriter(recordConsumer, schema)
215+
}
216+
override def init(configuration: Configuration): WriteContext = {
217+
new WriteContext(schema, new java.util.HashMap[String, String]())
218+
}
219+
override def write(record: Group) {
220+
groupWriter.write(record)
221+
}
222+
}
223+
val writer = new ParquetWriter[Group](path, writeSupport)
224+
writer.write(r1)
225+
writer.write(r2)
226+
writer.close()
227+
}
228+
229+
def readNestedFile(): Unit = {
230+
val configuration = new Configuration()
231+
val fs: FileSystem = testNestedDir1.getFileSystem(configuration)
232+
val schema: MessageType = MessageTypeParser.parseMessageType(testNestedSchema1)
233+
val outputStatus: FileStatus = fs.getFileStatus(testNestedDir1)
234+
val footers = ParquetFileReader.readFooter(configuration, outputStatus)
235+
val reader = new ParquetReader(testNestedDir1, new GroupReadSupport())
236+
val first = reader.read()
237+
assert(first != null)
238+
val second = reader.read()
239+
assert(second != null)
240+
assert(schema != null)
241+
assert(footers != null)
242+
}
153243
}
154244

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
6565
override def beforeAll() {
6666
ParquetTestData.writeFile()
6767
ParquetTestData.writeFilterFile()
68+
ParquetTestData.writeNestedFile1()
6869
testRDD = parquetFile(ParquetTestData.testDir.toString)
6970
testRDD.registerAsTable("testsource")
7071
parquetFile(ParquetTestData.testFilterDir.toString)
@@ -74,6 +75,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
7475
override def afterAll() {
7576
Utils.deleteRecursively(ParquetTestData.testDir)
7677
Utils.deleteRecursively(ParquetTestData.testFilterDir)
78+
Utils.deleteRecursively(ParquetTestData.testNestedDir1)
7779
// here we should also unregister the table??
7880
}
7981

@@ -363,4 +365,39 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
363365
val query = sql(s"SELECT mystring FROM testfiltersource WHERE myint < 10")
364366
assert(query.collect().size === 10)
365367
}
368+
369+
test("Importing nested File") {
370+
ParquetTestData.readNestedFile()
371+
val result = getRDD(ParquetTestData.testNestedData1).collect()
372+
/*assert(result.size === 15)
373+
result.zipWithIndex.foreach {
374+
case (row, index) => {
375+
val checkBoolean =
376+
if (index % 3 == 0)
377+
row(0) == true
378+
else
379+
row(0) == false
380+
assert(checkBoolean === true, s"boolean field value in line $index did not match")
381+
if (index % 5 == 0) assert(row(1) === 5, s"int field value in line $index did not match")
382+
assert(row(2) === "abc", s"string field value in line $index did not match")
383+
assert(row(3) === (index.toLong << 33), s"long value in line $index did not match")
384+
assert(row(4) === 2.5F, s"float field value in line $index did not match")
385+
assert(row(5) === 4.5D, s"double field value in line $index did not match")
386+
}
387+
}*/
388+
}
389+
390+
/**
391+
* Creates an empty SchemaRDD backed by a ParquetRelation.
392+
*
393+
* TODO: since this is so experimental it is better to have it here and not
394+
* in SQLContext. Also note that when creating new AttributeReferences
395+
* one needs to take care not to create duplicate Attribute ID's.
396+
*/
397+
private def createParquetFile(path: String, schema: (Tuple2[String, DataType])*): SchemaRDD = {
398+
val attributes = schema.map(t => new AttributeReference(t._1, t._2)())
399+
new SchemaRDD(
400+
TestSQLContext,
401+
parquet.ParquetRelation.createEmpty(path, attributes, sparkContext.hadoopConfiguration))
402+
}
366403
}

0 commit comments

Comments
 (0)