Skip to content

Commit ddb40d2

Browse files
Extending tests for nested Parquet data
1 parent 745a42b commit ddb40d2

File tree

2 files changed

+72
-26
lines changed

2 files changed

+72
-26
lines changed

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala

Lines changed: 59 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,34 @@ private[sql] object ParquetTestData {
130130
|}
131131
""".stripMargin
132132

133+
134+
val testNestedSchema2 =
135+
"""
136+
|message TestNested2 {
137+
|required int32 firstInt;
138+
|optional int32 secondInt;
139+
|optional group longs {
140+
|repeated int64 values;
141+
|}
142+
|required group booleanNumberPairs {
143+
|required double value;
144+
|optional boolean truth;
145+
|}
146+
|required group outerouter {
147+
|required group outer {
148+
|required group inner {
149+
|required int32 number;
150+
|}
151+
|}
152+
|}
153+
|}
154+
""".stripMargin
155+
133156
val testNestedDir1 = Utils.createTempDir()
157+
val testNestedDir2 = Utils.createTempDir()
134158

135159
lazy val testNestedData1 = new ParquetRelation(testNestedDir1.toURI.toString)
160+
lazy val testNestedData2 = new ParquetRelation(testNestedDir2.toURI.toString)
136161

137162
// Implicit
138163
// TODO: get rid of this since it is confusing!
@@ -216,40 +241,50 @@ private[sql] object ParquetTestData {
216241
val r2 = new SimpleGroup(schema)
217242
r2.add(0, "A. Nonymous")
218243

219-
// ParquetWriter initializes GroupWriteSupport with an empty configuration
220-
// (it is after all not intended to be used in this way?)
221-
// and members are private so we need to make our own
222-
val writeSupport = new WriteSupport[Group] {
223-
var groupWriter: GroupWriter = null
224-
override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
225-
groupWriter = new GroupWriter(recordConsumer, schema)
226-
}
227-
override def init(configuration: Configuration): WriteContext = {
228-
new WriteContext(schema, new java.util.HashMap[String, String]())
229-
}
230-
override def write(record: Group) {
231-
groupWriter.write(record)
232-
}
233-
}
244+
val writeSupport = new TestGroupWriteSupport(schema)
234245
val writer = new ParquetWriter[Group](path, writeSupport)
235246
writer.write(r1)
236247
writer.write(r2)
237248
writer.close()
238249
}
239250

240-
def readNestedFile(): Unit = {
251+
def writeNestedFile2() {
252+
testNestedDir2.delete()
253+
val path: Path = testNestedDir2
254+
val schema: MessageType = MessageTypeParser.parseMessageType(testNestedSchema2)
255+
256+
val r1 = new SimpleGroup(schema)
257+
r1.add(0, 1)
258+
r1.add(1, 7)
259+
val longs = r1.addGroup(2)
260+
longs.add("values", 1.toLong << 32)
261+
longs.add("values", 1.toLong << 33)
262+
longs.add("values", 1.toLong << 34)
263+
val booleanNumberPairs = r1.addGroup(3)
264+
booleanNumberPairs.add("value", 2.5)
265+
booleanNumberPairs.add("truth", false)
266+
r1.addGroup(4).addGroup(0).addGroup(0).add("number", 7)
267+
r1.addGroup(4).addGroup(0).addGroup(0).add("number", 8)
268+
r1.addGroup(4).addGroup(0).addGroup(0).add("number", 9)
269+
270+
val writeSupport = new TestGroupWriteSupport(schema)
271+
val writer = new ParquetWriter[Group](path, writeSupport)
272+
writer.write(r1)
273+
writer.close()
274+
}
275+
276+
277+
def readNestedFile(path: File, schemaString: String): Unit = {
241278
val configuration = new Configuration()
242-
val fs: FileSystem = testNestedDir1.getFileSystem(configuration)
243-
val schema: MessageType = MessageTypeParser.parseMessageType(testNestedSchema1)
244-
val outputStatus: FileStatus = fs.getFileStatus(testNestedDir1)
279+
val fs: FileSystem = path.getFileSystem(configuration)
280+
val schema: MessageType = MessageTypeParser.parseMessageType(schemaString)
281+
assert(schema != null)
282+
val outputStatus: FileStatus = fs.getFileStatus(path)
245283
val footers = ParquetFileReader.readFooter(configuration, outputStatus)
246-
val reader = new ParquetReader(testNestedDir1, new GroupReadSupport())
284+
assert(footers != null)
285+
val reader = new ParquetReader(path, new GroupReadSupport())
247286
val first = reader.read()
248287
assert(first != null)
249-
val second = reader.read()
250-
assert(second != null)
251-
assert(schema != null)
252-
assert(footers != null)
253288
}
254289
}
255290

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
6464
ParquetTestData.writeFile()
6565
ParquetTestData.writeFilterFile()
6666
ParquetTestData.writeNestedFile1()
67+
ParquetTestData.writeNestedFile2()
6768
testRDD = parquetFile(ParquetTestData.testDir.toString)
6869
testRDD.registerAsTable("testsource")
6970
parquetFile(ParquetTestData.testFilterDir.toString)
@@ -74,6 +75,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
7475
Utils.deleteRecursively(ParquetTestData.testDir)
7576
Utils.deleteRecursively(ParquetTestData.testFilterDir)
7677
Utils.deleteRecursively(ParquetTestData.testNestedDir1)
78+
Utils.deleteRecursively(ParquetTestData.testNestedDir2)
7779
// here we should also unregister the table??
7880
}
7981

@@ -197,7 +199,6 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
197199
assert(rdd_copy(i).apply(1) === rdd_orig(i).value, s"value in line $i")
198200
}
199201
Utils.deleteRecursively(file)
200-
assert(true)
201202
}
202203

203204
test("insert (appending) to same table via Scala API") {
@@ -365,7 +366,9 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
365366
}
366367

367368
test("Importing nested Parquet file (Addressbook)") {
368-
ParquetTestData.readNestedFile()
369+
ParquetTestData.readNestedFile(
370+
ParquetTestData.testNestedFile1,
371+
ParquetTestData.testNestedSchema1)
369372
val result = getRDD(ParquetTestData.testNestedData1).collect()
370373
assert(result != null)
371374
assert(result.size === 2)
@@ -386,6 +389,14 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
386389
assert(first_contacts.apply(1).apply(0) === "Chris Aniszczyk")
387390
}
388391

392+
test("Importing nested Parquet file (nested numbers)") {
393+
ParquetTestData.readNestedFile(
394+
ParquetTestData.testNestedFile2,
395+
ParquetTestData.testNestedSchema2)
396+
val result = getRDD(ParquetTestData.testNestedData2).collect()
397+
assert(result != null)
398+
}
399+
389400
/**
390401
* Creates an empty SchemaRDD backed by a ParquetRelation.
391402
*

0 commit comments

Comments
 (0)