Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Jun 19, 2015
1 parent bfc437c commit 4891efb
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ import java.util.{Calendar, TimeZone}
import org.apache.spark.sql.catalyst.expressions.Cast

/**
* Helper function to convert between Int value of days since epoch and java.sql.Date,
* also convert Long value of 100 nanoseconds and java.sql.Timestamp
* Helper functions for converting between internal and external date and time representations.
* Dates are exposed externally as java.sql.Date and are represented internally as the number of
* dates since the Unix epoch (1970-01-01). Timestamps are exposed externally as java.sql.Timestamp
* and are stored internally as longs, which are capable of storing timestamps with 100 nanosecond
* precision.
*/
object DateTimeUtils {
final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L
Expand Down Expand Up @@ -126,19 +129,17 @@ object DateTimeUtils {
}

/**
* Return the number of 100ns (hundred of nanoseconds) since epoch from julian day
* Return the number of 100ns (hundred of nanoseconds) since epoch from Julian day
* and nanoseconds in a day
*/
def fromJulianDay(day: Int, nanoseconds: Long): Long = {
// use integer to avoid rounding errors
// use Long to avoid rounding errors
val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY - SECONDS_PER_DAY / 2
seconds * HUNDRED_NANOS_PER_SECOND + nanoseconds / 100L
}

/**
* Return julian day and nanoseconds in a day from the number of 100ns (hundred of nanoseconds)
* @param num100ns
* @return
* Return Julian day and nanoseconds in a day from the number of 100ns (hundred of nanoseconds)
*/
def toJulianDay(num100ns: Long): (Int, Long) = {
val seconds = num100ns / HUNDRED_NANOS_PER_SECOND + SECONDS_PER_DAY / 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.parquet

import java.nio.{ByteBuffer, ByteOrder}
import java.nio.ByteOrder

import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap}

Expand Down Expand Up @@ -267,7 +267,12 @@ private[parquet] abstract class CatalystConverter extends GroupConverter {
* Read a Timestamp value from a Parquet Int96Value
*/
protected[parquet] def readTimestamp(value: Binary): Long = {
CatalystTimestampConverter.convertToTimestamp(value)
Preconditions.checkArgument(value.length() == 12, "Must be 12 bytes")
val buf = value.toByteBuffer
buf.order(ByteOrder.LITTLE_ENDIAN)
val timeOfDayNanos = buf.getLong
val julianDay = buf.getInt
DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
}
}

Expand Down Expand Up @@ -496,27 +501,6 @@ private[parquet] object CatalystArrayConverter {
val INITIAL_ARRAY_SIZE = 20
}

private[parquet] object CatalystTimestampConverter {
def convertToTimestamp(value: Binary): Long = {
Preconditions.checkArgument(value.length() == 12, "Must be 12 bytes")
val buf = value.toByteBuffer
buf.order(ByteOrder.LITTLE_ENDIAN)
val timeOfDayNanos = buf.getLong
val julianDay = buf.getInt
DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
}

def convertFromTimestamp(ts: Long): Binary = {
val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(ts)
val buf = ByteBuffer.allocate(12)
buf.order(ByteOrder.LITTLE_ENDIAN)
buf.putLong(timeOfDayNanos)
buf.putInt(julianDay)
buf.flip()
Binary.fromByteBuffer(buf)
}
}

/**
* A `parquet.io.api.GroupConverter` that converts a single-element groups that
* match the characteristics of an array (see
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.parquet

import java.nio.{ByteOrder, ByteBuffer}
import java.util.{HashMap => JHashMap}

import org.apache.hadoop.conf.Configuration
Expand All @@ -29,6 +30,7 @@ import org.apache.parquet.schema.MessageType

import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions.{Attribute, InternalRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -312,8 +314,16 @@ private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Lo
writer.addBinary(Binary.fromByteArray(scratchBytes, 0, numBytes))
}

// array used to write Timestamp as Int96 (fixed-length binary)
private val int96buf = new Array[Byte](12)

private[parquet] def writeTimestamp(ts: Long): Unit = {
writer.addBinary(CatalystTimestampConverter.convertFromTimestamp(ts))
val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(ts)
val buf = ByteBuffer.wrap(int96buf)
buf.order(ByteOrder.LITTLE_ENDIAN)
buf.putLong(timeOfDayNanos)
buf.putInt(julianDay)
writer.addBinary(Binary.fromByteArray(int96buf))
}
}

Expand Down

0 comments on commit 4891efb

Please sign in to comment.