Skip to content

[SPARK-16814][SQL] Fix deprecated parquet constructor usage #14419

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.avro.Schema
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetWriter

import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.datasources.parquet.test.avro._
Expand All @@ -35,14 +36,14 @@ import org.apache.spark.sql.test.SharedSQLContext
class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with SharedSQLContext {
private def withWriter[T <: IndexedRecord]
(path: String, schema: Schema)
(f: AvroParquetWriter[T] => Unit): Unit = {
(f: ParquetWriter[T] => Unit): Unit = {
logInfo(
s"""Writing Avro records with the following Avro schema into Parquet file:
|
|${schema.toString(true)}
""".stripMargin)

val writer = new AvroParquetWriter[T](new Path(path), schema)
val writer = AvroParquetWriter.builder[T](new Path(path)).withSchema(schema).build()
try f(writer) finally writer.close()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,18 @@ private[sql] object ParquetCompatibilityTest {
metadata: Map[String, String],
recordWriters: (RecordConsumer => Unit)*): Unit = {
val messageType = MessageTypeParser.parseMessageType(schema)
val writeSupport = new DirectWriteSupport(messageType, metadata)
val parquetWriter = new ParquetWriter[RecordConsumer => Unit](new Path(path), writeSupport)
val testWriteSupport = new DirectWriteSupport(messageType, metadata)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain what's going on here? This seems like pretty complicated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, so the parquetWriter constructors are deprecated now and its been replaced with a builder interface. For Avro and others there is a standard builder - but for sort "raw" formats you need to implement your own builder. This is equivalent to the old constructor we were using - you can see the deprecation in https://github.com/apache/parquet-mr/pull/199/files as well as how the builder interface ends up calling an equivalent (now protected) constructor. Also since our WriteSupport doesn't need to change based on the configuration we always return the same writesupport regardless of conf.

If it would be useful I can add some of this as a comment in the sourcecode.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin Does this explanation make sense for you?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might just go ahead and add it as a comment for good measure.

Isn't @Override the Java annotation? thought Scala needed @override but could be missing something.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know Java annotation passes Scala style checking. Should we consider adding a rule for this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, because I don't think it would actually cause the Scala compiler to verify it overrides. It's not illegal to use a Java annotation, just doesn't do anything in this case?

Copy link
Member

@HyukjinKwon HyukjinKwon Aug 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed here, https://issues.apache.org/jira/browse/SPARK-16877 but I will do some researches and take a look into this deeper first before opening a PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala just do override, not even @OverRide.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack yeah that's what I meant, thank you.

/**
* Provide a builder for constructing a parquet writer - after PARQUET-248 directly constructing
* the writer is deprecated and should be done through a builder. The default builders include
* Avro - but for raw Parquet writing we must create our own builder.
*/
class ParquetWriterBuilder() extends
ParquetWriter.Builder[RecordConsumer => Unit, ParquetWriterBuilder](new Path(path)) {
override def getWriteSupport(conf: Configuration) = testWriteSupport
override def self() = this
}
val parquetWriter = new ParquetWriterBuilder().build()
try recordWriters.foreach(parquetWriter.write) finally parquetWriter.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,20 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
|}
""".stripMargin)

val writeSupport = new TestGroupWriteSupport(schema)
val writer = new ParquetWriter[Group](path, writeSupport)
val testWriteSupport = new TestGroupWriteSupport(schema)
/**
* Provide a builder for constructing a parquet writer - after PARQUET-248 directly
* constructing the writer is deprecated and should be done through a builder. The default
* builders include Avro - but for raw Parquet writing we must create our own builder.
*/
class ParquetWriterBuilder() extends
ParquetWriter.Builder[Group, ParquetWriterBuilder](path) {
override def getWriteSupport(conf: Configuration) = testWriteSupport

override def self() = this
}

val writer = new ParquetWriterBuilder().build()

(0 until 10).foreach { i =>
val record = new SimpleGroup(schema)
Expand Down