Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Jun 11, 2015
1 parent ea196d4 commit 361fd62
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,25 @@

package org.apache.spark.sql.catalyst.util

import java.sql.{Timestamp, Date}
import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.util.{Calendar, TimeZone}

import org.apache.spark.sql.catalyst.expressions.Cast

/**
* Helper function to convert between Int value of days since 1970-01-01 and java.sql.Date
* Helper function to convert between Int value of days since epoch and java.sql.Date,
* also convert Long value of 100 nanoseconds and java.sql.Timestamp
*/
object DateTimeUtils {
private val MILLIS_PER_DAY = 86400000
private val HUNDRED_NANOS_PER_SECOND = 10000000L
final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L

// see http://stackoverflow.com/questions/466321/convert-unix-timestamp-to-julian
final val JULIAN_DAY_OF_EPOCH = 2440587 // and .5
final val SECONDS_PER_DAY = 60 * 60 * 24L
final val HUNDRED_NANOS_PER_SECOND = 1000L * 1000L * 10L
final val NANOS_PER_SECOND = HUNDRED_NANOS_PER_SECOND * 100


// Java TimeZone has no mention of thread safety. Use thread local instance to be safe.
private val LOCAL_TIMEZONE = new ThreadLocal[TimeZone] {
Expand Down Expand Up @@ -117,4 +124,27 @@ object DateTimeUtils {
0L
}
}

/**
* Return the number of 100ns (hundred of nanoseconds) since epoch from julian day
* and nanoseconds in a day
*/
def fromJulianDay(day: Int, nanoseconds: Long): Long = {
// use integer to avoid rounding errors
val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY - SECONDS_PER_DAY / 2
seconds * HUNDRED_NANOS_PER_SECOND + nanoseconds / 100L
}

/**
* Return julian day and nanoseconds in a day from the number of 100ns (hundred of nanoseconds)
* @param num100ns
* @return
*/
def toJulianDay(num100ns: Long): (Int, Long) = {
val seconds = num100ns / HUNDRED_NANOS_PER_SECOND + SECONDS_PER_DAY / 2
val day = seconds / SECONDS_PER_DAY + JULIAN_DAY_OF_EPOCH
val secondsInDay = seconds % SECONDS_PER_DAY
val nanos = (num100ns % HUNDRED_NANOS_PER_SECOND) * 100L
(day.toInt, secondsInDay * NANOS_PER_SECOND + nanos)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import java.sql.Timestamp
import org.apache.spark.SparkFunSuite


class DateUtilsSuite extends SparkFunSuite {
class DateTimeUtilsSuite extends SparkFunSuite {

test("timestamp") {
test("timestamp and 100ns") {
val now = new Timestamp(System.currentTimeMillis())
now.setNanos(100)
val ns = DateTimeUtils.fromJavaTimestamp(now)
Expand All @@ -37,4 +37,16 @@ class DateUtilsSuite extends SparkFunSuite {
assert(DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromJavaTimestamp(ts)) == ts)
}
}

test("100ns and julian day") {
val (d, ns) = DateTimeUtils.toJulianDay(0)
assert(d == 2440587)
assert(ns == DateTimeUtils.SECONDS_PER_DAY / 2 * DateTimeUtils.NANOS_PER_SECOND)
assert(DateTimeUtils.fromJulianDay(d, ns) == 0L)

val t = new Timestamp(61394778610000L) // (2015, 6, 11, 10, 10, 10, 100)
val (d1, ns1) = DateTimeUtils.toJulianDay(DateTimeUtils.fromJavaTimestamp(t))
val t2 = DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromJulianDay(d1, ns1))
assert(t.equals(t2))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@

package org.apache.spark.sql.parquet

import java.nio.{ByteBuffer, ByteOrder}

import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap}

import org.apache.parquet.Preconditions
import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
import org.apache.parquet.schema.MessageType

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.parquet.CatalystConverter.FieldType
import org.apache.spark.sql.parquet.timestamp.NanoTime
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -493,21 +496,23 @@ private[parquet] object CatalystArrayConverter {
}

private[parquet] object CatalystTimestampConverter {
// see http://stackoverflow.com/questions/466321/convert-unix-timestamp-to-julian
val JULIAN_DAY_OF_EPOCH = 2440587.5
val SECONDS_PER_DAY = 60 * 60 * 24

def convertToTimestamp(value: Binary): Long = {
val nt = NanoTime.fromBinary(value)
((nt.getJulianDay - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY * 1e7
+ nt.getTimeOfDayNanos / 100.0).toLong
Preconditions.checkArgument(bytes.length() == 12, "Must be 12 bytes")
val buf = value.toByteBuffer
buf.order(ByteOrder.LITTLE_ENDIAN)
val timeOfDayNanos = buf.getLong
val julianDay = buf.getInt
DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
}

def convertFromTimestamp(ts: Long): Binary = {
val julian = ts / 1e7 / SECONDS_PER_DAY + JULIAN_DAY_OF_EPOCH
val day = julian.toInt
val nanos = ((julian - day) * SECONDS_PER_DAY * 1e9).toLong
NanoTime(day, nanos).toBinary
val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(ts)
val buf = ByteBuffer.allocate(12)
buf.order(ByteOrder.LITTLE_ENDIAN)
buf.putLong(timeOfDayNanos)
buf.putInt(julianDay)
buf.flip()
Binary.fromByteBuffer(buf)
}
}

Expand Down

This file was deleted.

0 comments on commit 361fd62

Please sign in to comment.