Skip to content

Commit

Permalink
[SPARK-4987] [SQL] parquet timestamp type support
Browse files Browse the repository at this point in the history
Author: Daoyuan Wang <daoyuan.wang@intel.com>

Closes apache#3820 from adrian-wang/parquettimestamp and squashes the following commits:

b1e2a0d [Daoyuan Wang] fix for nanos
4dadef1 [Daoyuan Wang] fix wrong read
93f438d [Daoyuan Wang] parquet timestamp support
  • Loading branch information
adrian-wang authored and marmbrus committed Feb 3, 2015
1 parent 4204a12 commit 0c20ce6
Show file tree
Hide file tree
Showing 11 changed files with 239 additions and 26 deletions.
9 changes: 9 additions & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,15 @@ Configuration of Parquet can be done using the `setConf` method on SQLContext or
flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.
</td>
</tr>
<tr>
<td><code>spark.sql.parquet.int96AsTimestamp</code></td>
<td>true</td>
<td>
Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. Spark would also
store Timestamp as INT96 because we need to avoid precision lost of the nanoseconds field. This
flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.
</td>
</tr>
<tr>
<td><code>spark.sql.parquet.cacheMetadata</code></td>
<td>true</td>
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
<scala.binary.version>2.10</scala.binary.version>
<jline.version>${scala.version}</jline.version>
<jline.groupid>org.scala-lang</jline.groupid>
<jodd.version>3.6.3</jodd.version>
<codehaus.jackson.version>1.8.8</codehaus.jackson.version>
<snappy.version>1.1.1.6</snappy.version>

Expand Down
5 changes: 5 additions & 0 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@
<artifactId>jackson-databind</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.jodd</groupId>
<artifactId>jodd-core</artifactId>
<version>${jodd.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
7 changes: 7 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ private[spark] object SQLConf {
val DIALECT = "spark.sql.dialect"

val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
val PARQUET_INT96_AS_TIMESTAMP = "spark.sql.parquet.int96AsTimestamp"
val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
Expand Down Expand Up @@ -143,6 +144,12 @@ private[sql] class SQLConf extends Serializable {
private[spark] def isParquetBinaryAsString: Boolean =
getConf(PARQUET_BINARY_AS_STRING, "false").toBoolean

/**
* When set to true, we always treat INT96Values in Parquet files as timestamp.
*/
private[spark] def isParquetINT96AsTimestamp: Boolean =
getConf(PARQUET_INT96_AS_TIMESTAMP, "true").toBoolean

/**
* When set to true, partition pruning for in-memory columnar tables is enabled.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@

package org.apache.spark.sql.parquet

import java.sql.Timestamp
import java.util.{TimeZone, Calendar}

import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap}

import jodd.datetime.JDateTime
import parquet.column.Dictionary
import parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter}
import parquet.schema.MessageType

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.parquet.CatalystConverter.FieldType
import org.apache.spark.sql.types._
import org.apache.spark.sql.parquet.timestamp.NanoTime

/**
* Collection of converters of Parquet types (group and primitive types) that
Expand Down Expand Up @@ -123,6 +128,12 @@ private[sql] object CatalystConverter {
parent.updateDecimal(fieldIndex, value, d)
}
}
case TimestampType => {
new CatalystPrimitiveConverter(parent, fieldIndex) {
override def addBinary(value: Binary): Unit =
parent.updateTimestamp(fieldIndex, value)
}
}
// All other primitive types use the default converter
case ctype: PrimitiveType => { // note: need the type tag here!
new CatalystPrimitiveConverter(parent, fieldIndex)
Expand Down Expand Up @@ -197,9 +208,11 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
protected[parquet] def updateString(fieldIndex: Int, value: String): Unit =
updateField(fieldIndex, value)

protected[parquet] def updateDecimal(fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = {
protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit =
updateField(fieldIndex, readTimestamp(value))

protected[parquet] def updateDecimal(fieldIndex: Int, value: Binary, ctype: DecimalType): Unit =
updateField(fieldIndex, readDecimal(new Decimal(), value, ctype))
}

protected[parquet] def isRootConverter: Boolean = parent == null

Expand Down Expand Up @@ -232,6 +245,13 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
unscaled = (unscaled << (64 - numBits)) >> (64 - numBits)
dest.set(unscaled, precision, scale)
}

/**
* Read a Timestamp value from a Parquet Int96Value
*/
protected[parquet] def readTimestamp(value: Binary): Timestamp = {
CatalystTimestampConverter.convertToTimestamp(value)
}
}

/**
Expand Down Expand Up @@ -384,6 +404,9 @@ private[parquet] class CatalystPrimitiveRowConverter(
override protected[parquet] def updateString(fieldIndex: Int, value: String): Unit =
current.setString(fieldIndex, value)

override protected[parquet] def updateTimestamp(fieldIndex: Int, value: Binary): Unit =
current.update(fieldIndex, readTimestamp(value))

override protected[parquet] def updateDecimal(
fieldIndex: Int, value: Binary, ctype: DecimalType): Unit = {
var decimal = current(fieldIndex).asInstanceOf[Decimal]
Expand Down Expand Up @@ -454,6 +477,73 @@ private[parquet] object CatalystArrayConverter {
val INITIAL_ARRAY_SIZE = 20
}

private[parquet] object CatalystTimestampConverter {
// TODO most part of this comes from Hive-0.14
// Hive code might have some issues, so we need to keep an eye on it.
// Also we use NanoTime and Int96Values from parquet-examples.
// We utilize jodd to convert between NanoTime and Timestamp
val parquetTsCalendar = new ThreadLocal[Calendar]
def getCalendar = {
// this is a cache for the calendar instance.
if (parquetTsCalendar.get == null) {
parquetTsCalendar.set(Calendar.getInstance(TimeZone.getTimeZone("GMT")))
}
parquetTsCalendar.get
}
val NANOS_PER_SECOND: Long = 1000000000
val SECONDS_PER_MINUTE: Long = 60
val MINUTES_PER_HOUR: Long = 60
val NANOS_PER_MILLI: Long = 1000000

def convertToTimestamp(value: Binary): Timestamp = {
val nt = NanoTime.fromBinary(value)
val timeOfDayNanos = nt.getTimeOfDayNanos
val julianDay = nt.getJulianDay
val jDateTime = new JDateTime(julianDay.toDouble)
val calendar = getCalendar
calendar.set(Calendar.YEAR, jDateTime.getYear)
calendar.set(Calendar.MONTH, jDateTime.getMonth - 1)
calendar.set(Calendar.DAY_OF_MONTH, jDateTime.getDay)

// written in command style
var remainder = timeOfDayNanos
calendar.set(
Calendar.HOUR_OF_DAY,
(remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR)).toInt)
remainder = remainder % (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR)
calendar.set(
Calendar.MINUTE, (remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE)).toInt)
remainder = remainder % (NANOS_PER_SECOND * SECONDS_PER_MINUTE)
calendar.set(Calendar.SECOND, (remainder / NANOS_PER_SECOND).toInt)
val nanos = remainder % NANOS_PER_SECOND
val ts = new Timestamp(calendar.getTimeInMillis)
ts.setNanos(nanos.toInt)
ts
}

def convertFromTimestamp(ts: Timestamp): Binary = {
val calendar = getCalendar
calendar.setTime(ts)
val jDateTime = new JDateTime(calendar.get(Calendar.YEAR),
calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH))
// Hive-0.14 didn't set hour before get day number, while the day number should
// has something to do with hour, since julian day number grows at 12h GMT
// here we just follow what hive does.
val julianDay = jDateTime.getJulianDayNumber

val hour = calendar.get(Calendar.HOUR_OF_DAY)
val minute = calendar.get(Calendar.MINUTE)
val second = calendar.get(Calendar.SECOND)
val nanos = ts.getNanos
// Hive-0.14 would use hours directly, that might be wrong, since the day starts
// from 12h in Julian. here we just follow what hive does.
val nanosOfDay = nanos + second * NANOS_PER_SECOND +
minute * NANOS_PER_SECOND * SECONDS_PER_MINUTE +
hour * NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR
NanoTime(julianDay, nanosOfDay).toBinary
}
}

/**
* A `parquet.io.api.GroupConverter` that converts a single-element groups that
* match the characteristics of an array (see
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ private[sql] case class ParquetRelation(
ParquetTypesConverter.readSchemaFromFile(
new Path(path.split(",").head),
conf,
sqlContext.conf.isParquetBinaryAsString)

sqlContext.conf.isParquetBinaryAsString,
sqlContext.conf.isParquetINT96AsTimestamp)
lazy val attributeMap = AttributeMap(output.map(o => o -> o))

override def newInstance() = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
// TODO: Why it can be null?
if (schema == null) {
log.debug("falling back to Parquet read schema")
schema = ParquetTypesConverter.convertToAttributes(parquetSchema, false)
schema = ParquetTypesConverter.convertToAttributes(
parquetSchema, false, true)
}
log.debug(s"list of attributes that will be read: $schema")
new RowRecordMaterializer(parquetSchema, schema)
Expand Down Expand Up @@ -184,12 +185,12 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
case t @ StructType(_) => writeStruct(
t,
value.asInstanceOf[CatalystConverter.StructScalaType[_]])
case _ => writePrimitive(schema.asInstanceOf[PrimitiveType], value)
case _ => writePrimitive(schema.asInstanceOf[NativeType], value)
}
}
}

private[parquet] def writePrimitive(schema: PrimitiveType, value: Any): Unit = {
private[parquet] def writePrimitive(schema: DataType, value: Any): Unit = {
if (value != null) {
schema match {
case StringType => writer.addBinary(
Expand All @@ -202,6 +203,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
case IntegerType => writer.addInteger(value.asInstanceOf[Int])
case ShortType => writer.addInteger(value.asInstanceOf[Short])
case LongType => writer.addLong(value.asInstanceOf[Long])
case TimestampType => writeTimestamp(value.asInstanceOf[java.sql.Timestamp])
case ByteType => writer.addInteger(value.asInstanceOf[Byte])
case DoubleType => writer.addDouble(value.asInstanceOf[Double])
case FloatType => writer.addFloat(value.asInstanceOf[Float])
Expand Down Expand Up @@ -307,6 +309,10 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
writer.addBinary(Binary.fromByteArray(scratchBytes, 0, numBytes))
}

private[parquet] def writeTimestamp(ts: java.sql.Timestamp): Unit = {
val binaryNanoTime = CatalystTimestampConverter.convertFromTimestamp(ts)
writer.addBinary(binaryNanoTime)
}
}

// Optimized for non-nested rows
Expand Down Expand Up @@ -351,6 +357,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
case DoubleType => writer.addDouble(record.getDouble(index))
case FloatType => writer.addFloat(record.getFloat(index))
case BooleanType => writer.addBoolean(record.getBoolean(index))
case TimestampType => writeTimestamp(record(index).asInstanceOf[java.sql.Timestamp])
case d: DecimalType =>
if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) {
sys.error(s"Unsupported datatype $d, cannot write to consumer")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.test.TestSQLContext

import parquet.example.data.{GroupWriter, Group}
import parquet.example.data.simple.SimpleGroup
import parquet.example.data.simple.{NanoTime, SimpleGroup}
import parquet.hadoop.{ParquetReader, ParquetFileReader, ParquetWriter}
import parquet.hadoop.api.WriteSupport
import parquet.hadoop.api.WriteSupport.WriteContext
Expand Down Expand Up @@ -63,6 +63,7 @@ private[sql] object ParquetTestData {
optional int64 mylong;
optional float myfloat;
optional double mydouble;
optional int96 mytimestamp;
}"""

// field names for test assertion error messages
Expand All @@ -72,7 +73,8 @@ private[sql] object ParquetTestData {
"mystring:String",
"mylong:Long",
"myfloat:Float",
"mydouble:Double"
"mydouble:Double",
"mytimestamp:Timestamp"
)

val subTestSchema =
Expand All @@ -98,6 +100,7 @@ private[sql] object ParquetTestData {
optional int64 myoptlong;
optional float myoptfloat;
optional double myoptdouble;
optional int96 mytimestamp;
}
"""

Expand Down Expand Up @@ -236,6 +239,7 @@ private[sql] object ParquetTestData {
record.add(3, i.toLong << 33)
record.add(4, 2.5F)
record.add(5, 4.5D)
record.add(6, new NanoTime(1,2))
writer.write(record)
}
writer.close()
Expand Down
Loading

0 comments on commit 0c20ce6

Please sign in to comment.