Skip to content

Commit

Permalink
[SPARK-8307] [SQL] improve timestamp from parquet
Browse files Browse the repository at this point in the history
This PR change to convert julian day to unix timestamp directly (without Calendar and Timestamp).

cc adrian-wang rxin

Author: Davies Liu <davies@databricks.com>

Closes apache#6759 from davies/improve_ts and squashes the following commits:

849e301 [Davies Liu] Merge branch 'master' of github.com:apache/spark into improve_ts
b0e4cad [Davies Liu] Merge branch 'master' of github.com:apache/spark into improve_ts
8e2d56f [Davies Liu] address comments
634b9f5 [Davies Liu] fix mima
4891efb [Davies Liu] address comment
bfc437c [Davies Liu] fix build
ae5979c [Davies Liu] Merge branch 'master' of github.com:apache/spark into improve_ts
602b969 [Davies Liu] remove jodd
2f2e48c [Davies Liu] fix test
8ace611 [Davies Liu] fix mima
212143b [Davies Liu] fix mina
c834108 [Davies Liu] Merge branch 'master' of github.com:apache/spark into improve_ts
a3171b8 [Davies Liu] Merge branch 'master' of github.com:apache/spark into improve_ts
5233974 [Davies Liu] fix scala style
361fd62 [Davies Liu] address comments
ea196d4 [Davies Liu] improve timestamp from parquet
  • Loading branch information
Davies Liu committed Jun 23, 2015
1 parent 860a49e commit 6b7f2ce
Show file tree
Hide file tree
Showing 24 changed files with 175 additions and 252 deletions.
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@
<scala.binary.version>2.10</scala.binary.version>
<jline.version>${scala.version}</jline.version>
<jline.groupid>org.scala-lang</jline.groupid>
<jodd.version>3.6.3</jodd.version>
<codehaus.jackson.version>1.9.13</codehaus.jackson.version>
<fasterxml.jackson.version>2.4.4</fasterxml.jackson.version>
<snappy.version>1.1.1.7</snappy.version>
Expand Down
12 changes: 11 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,17 @@ object MimaExcludes {
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.streaming.kafka.KafkaTestUtils.waitUntilLeaderOffset"),
// SQL execution is considered private.
excludePackage("org.apache.spark.sql.execution")
excludePackage("org.apache.spark.sql.execution"),
// NanoTime and CatalystTimestampConverter is only used inside catalyst,
// not needed anymore
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.timestamp.NanoTime"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.timestamp.NanoTime$"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.CatalystTimestampConverter"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.parquet.CatalystTimestampConverter$")
)
case v if v.startsWith("1.4") =>
Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ package org.apache.spark.sql.catalyst

import java.lang.{Iterable => JavaIterable}
import java.math.{BigDecimal => JavaBigDecimal}
import java.sql.{Timestamp, Date}
import java.sql.{Date, Timestamp}
import java.util.{Map => JavaMap}
import javax.annotation.Nullable

import scala.collection.mutable.HashMap

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -272,18 +272,18 @@ object CatalystTypeConverters {
}

private object DateConverter extends CatalystTypeConverter[Date, Date, Any] {
override def toCatalystImpl(scalaValue: Date): Int = DateUtils.fromJavaDate(scalaValue)
override def toCatalystImpl(scalaValue: Date): Int = DateTimeUtils.fromJavaDate(scalaValue)
override def toScala(catalystValue: Any): Date =
if (catalystValue == null) null else DateUtils.toJavaDate(catalystValue.asInstanceOf[Int])
if (catalystValue == null) null else DateTimeUtils.toJavaDate(catalystValue.asInstanceOf[Int])
override def toScalaImpl(row: InternalRow, column: Int): Date = toScala(row.getInt(column))
}

private object TimestampConverter extends CatalystTypeConverter[Timestamp, Timestamp, Any] {
override def toCatalystImpl(scalaValue: Timestamp): Long =
DateUtils.fromJavaTimestamp(scalaValue)
DateTimeUtils.fromJavaTimestamp(scalaValue)
override def toScala(catalystValue: Any): Timestamp =
if (catalystValue == null) null
else DateUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long])
else DateTimeUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long])
override def toScalaImpl(row: InternalRow, column: Int): Timestamp =
toScala(row.getLong(column))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.text.{DateFormat, SimpleDateFormat}
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -115,9 +115,9 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
// UDFToString
private[this] def castToString(from: DataType): Any => Any = from match {
case BinaryType => buildCast[Array[Byte]](_, UTF8String.fromBytes)
case DateType => buildCast[Int](_, d => UTF8String.fromString(DateUtils.toString(d)))
case DateType => buildCast[Int](_, d => UTF8String.fromString(DateTimeUtils.toString(d)))
case TimestampType => buildCast[Long](_,
t => UTF8String.fromString(timestampToString(DateUtils.toJavaTimestamp(t))))
t => UTF8String.fromString(timestampToString(DateTimeUtils.toJavaTimestamp(t))))
case _ => buildCast[Any](_, o => UTF8String.fromString(o.toString))
}

Expand Down Expand Up @@ -162,7 +162,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
if (periodIdx != -1 && n.length() - periodIdx > 9) {
n = n.substring(0, periodIdx + 10)
}
try DateUtils.fromJavaTimestamp(Timestamp.valueOf(n))
try DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(n))
catch { case _: java.lang.IllegalArgumentException => null }
})
case BooleanType =>
Expand All @@ -176,7 +176,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case ByteType =>
buildCast[Byte](_, b => longToTimestamp(b.toLong))
case DateType =>
buildCast[Int](_, d => DateUtils.toMillisSinceEpoch(d) * 10000)
buildCast[Int](_, d => DateTimeUtils.toMillisSinceEpoch(d) * 10000)
// TimestampWritable.decimalToTimestamp
case DecimalType() =>
buildCast[Decimal](_, d => decimalToTimestamp(d))
Expand Down Expand Up @@ -225,13 +225,13 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
private[this] def castToDate(from: DataType): Any => Any = from match {
case StringType =>
buildCast[UTF8String](_, s =>
try DateUtils.fromJavaDate(Date.valueOf(s.toString))
try DateTimeUtils.fromJavaDate(Date.valueOf(s.toString))
catch { case _: java.lang.IllegalArgumentException => null }
)
case TimestampType =>
// throw valid precision more than seconds, according to Hive.
// Timestamp.nanos is in 0 to 999,999,999, no more than a second.
buildCast[Long](_, t => DateUtils.millisToDays(t / 10000L))
buildCast[Long](_, t => DateTimeUtils.millisToDays(t / 10000L))
// Hive throws this exception as a Semantic Exception
// It is never possible to compare result when hive return with exception,
// so we can return null
Expand Down Expand Up @@ -442,7 +442,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case (DateType, StringType) =>
defineCodeGen(ctx, ev, c =>
s"""${ctx.stringType}.fromString(
org.apache.spark.sql.catalyst.util.DateUtils.toString($c))""")
org.apache.spark.sql.catalyst.util.DateTimeUtils.toString($c))""")
// Special handling required for timestamps in hive test cases since the toString function
// does not match the expected output.
case (TimestampType, StringType) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand All @@ -39,8 +39,8 @@ object Literal {
case d: BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
case d: java.math.BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
case d: Decimal => Literal(d, DecimalType.Unlimited)
case t: Timestamp => Literal(DateUtils.fromJavaTimestamp(t), TimestampType)
case d: Date => Literal(DateUtils.fromJavaDate(d), DateType)
case t: Timestamp => Literal(DateTimeUtils.fromJavaTimestamp(t), TimestampType)
case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType)
case a: Array[Byte] => Literal(a, BinaryType)
case null => Literal(null, NullType)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,28 @@

package org.apache.spark.sql.catalyst.util

import java.sql.{Timestamp, Date}
import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.util.{Calendar, TimeZone}

import org.apache.spark.sql.catalyst.expressions.Cast

/**
* Helper function to convert between Int value of days since 1970-01-01 and java.sql.Date
* Helper functions for converting between internal and external date and time representations.
* Dates are exposed externally as java.sql.Date and are represented internally as the number of
* dates since the Unix epoch (1970-01-01). Timestamps are exposed externally as java.sql.Timestamp
* and are stored internally as longs, which are capable of storing timestamps with 100 nanosecond
* precision.
*/
object DateUtils {
private val MILLIS_PER_DAY = 86400000
private val HUNDRED_NANOS_PER_SECOND = 10000000L
object DateTimeUtils {
final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L

// see http://stackoverflow.com/questions/466321/convert-unix-timestamp-to-julian
final val JULIAN_DAY_OF_EPOCH = 2440587 // and .5
final val SECONDS_PER_DAY = 60 * 60 * 24L
final val HUNDRED_NANOS_PER_SECOND = 1000L * 1000L * 10L
final val NANOS_PER_SECOND = HUNDRED_NANOS_PER_SECOND * 100


// Java TimeZone has no mention of thread safety. Use thread local instance to be safe.
private val LOCAL_TIMEZONE = new ThreadLocal[TimeZone] {
Expand Down Expand Up @@ -117,4 +127,25 @@ object DateUtils {
0L
}
}

/**
* Return the number of 100ns (hundred of nanoseconds) since epoch from Julian day
* and nanoseconds in a day
*/
def fromJulianDay(day: Int, nanoseconds: Long): Long = {
// use Long to avoid rounding errors
val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY - SECONDS_PER_DAY / 2
seconds * HUNDRED_NANOS_PER_SECOND + nanoseconds / 100L
}

/**
* Return Julian day and nanoseconds in a day from the number of 100ns (hundred of nanoseconds)
*/
def toJulianDay(num100ns: Long): (Int, Long) = {
val seconds = num100ns / HUNDRED_NANOS_PER_SECOND + SECONDS_PER_DAY / 2
val day = seconds / SECONDS_PER_DAY + JULIAN_DAY_OF_EPOCH
val secondsInDay = seconds % SECONDS_PER_DAY
val nanos = (num100ns % HUNDRED_NANOS_PER_SECOND) * 100L
(day.toInt, secondsInDay * NANOS_PER_SECOND + nanos)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
import java.sql.{Timestamp, Date}

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -156,7 +156,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(cast(cast(sd, DateType), StringType), sd)
checkEvaluation(cast(cast(d, StringType), DateType), 0)
checkEvaluation(cast(cast(nts, TimestampType), StringType), nts)
checkEvaluation(cast(cast(ts, StringType), TimestampType), DateUtils.fromJavaTimestamp(ts))
checkEvaluation(cast(cast(ts, StringType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts))

// all convert to string type to check
checkEvaluation(cast(cast(cast(nts, TimestampType), DateType), StringType), sd)
Expand Down Expand Up @@ -301,9 +301,10 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(cast(ts, LongType), 15.toLong)
checkEvaluation(cast(ts, FloatType), 15.002f)
checkEvaluation(cast(ts, DoubleType), 15.002)
checkEvaluation(cast(cast(tss, ShortType), TimestampType), DateUtils.fromJavaTimestamp(ts))
checkEvaluation(cast(cast(tss, IntegerType), TimestampType), DateUtils.fromJavaTimestamp(ts))
checkEvaluation(cast(cast(tss, LongType), TimestampType), DateUtils.fromJavaTimestamp(ts))
checkEvaluation(cast(cast(tss, ShortType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts))
checkEvaluation(cast(cast(tss, IntegerType), TimestampType),
DateTimeUtils.fromJavaTimestamp(ts))
checkEvaluation(cast(cast(tss, LongType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts))
checkEvaluation(
cast(cast(millis.toFloat / 1000, TimestampType), FloatType),
millis.toFloat / 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.immutable.HashSet

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types.{IntegerType, BooleanType}


Expand Down Expand Up @@ -167,8 +167,8 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(Literal(true) <=> Literal.create(null, BooleanType), false, row)
checkEvaluation(Literal.create(null, BooleanType) <=> Literal(true), false, row)

val d1 = DateUtils.fromJavaDate(Date.valueOf("1970-01-01"))
val d2 = DateUtils.fromJavaDate(Date.valueOf("1970-01-02"))
val d1 = DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-01"))
val d2 = DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-02"))
checkEvaluation(Literal(d1) < Literal(d2), true)

val ts1 = new Timestamp(12)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.Arrays
import org.scalatest.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.PlatformDependent
import org.apache.spark.unsafe.array.ByteArrayMethods
Expand Down Expand Up @@ -83,8 +83,8 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
val row = new SpecificMutableRow(fieldTypes)
row.setLong(0, 0)
row.setString(1, "Hello")
row.update(2, DateUtils.fromJavaDate(Date.valueOf("1970-01-01")))
row.update(3, DateUtils.fromJavaTimestamp(Timestamp.valueOf("2015-05-08 08:10:25")))
row.update(2, DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-01")))
row.update(3, DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-05-08 08:10:25")))

val sizeRequired: Int = converter.getSizeRequirement(row)
sizeRequired should be (8 + (8 * 4) +
Expand All @@ -98,9 +98,9 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
unsafeRow.getLong(0) should be (0)
unsafeRow.getString(1) should be ("Hello")
// Date is represented as Int in unsafeRow
DateUtils.toJavaDate(unsafeRow.getInt(2)) should be (Date.valueOf("1970-01-01"))
DateTimeUtils.toJavaDate(unsafeRow.getInt(2)) should be (Date.valueOf("1970-01-01"))
// Timestamp is represented as Long in unsafeRow
DateUtils.toJavaTimestamp(unsafeRow.getLong(3)) should be
DateTimeUtils.toJavaTimestamp(unsafeRow.getLong(3)) should be
(Timestamp.valueOf("2015-05-08 08:10:25"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,31 @@ import java.sql.Timestamp

import org.apache.spark.SparkFunSuite

class DateUtilsSuite extends SparkFunSuite {
class DateTimeUtilsSuite extends SparkFunSuite {

test("timestamp") {
test("timestamp and 100ns") {
val now = new Timestamp(System.currentTimeMillis())
now.setNanos(100)
val ns = DateUtils.fromJavaTimestamp(now)
assert(ns % 10000000L == 1)
assert(DateUtils.toJavaTimestamp(ns) == now)
val ns = DateTimeUtils.fromJavaTimestamp(now)
assert(ns % 10000000L === 1)
assert(DateTimeUtils.toJavaTimestamp(ns) === now)

List(-111111111111L, -1L, 0, 1L, 111111111111L).foreach { t =>
val ts = DateUtils.toJavaTimestamp(t)
assert(DateUtils.fromJavaTimestamp(ts) == t)
assert(DateUtils.toJavaTimestamp(DateUtils.fromJavaTimestamp(ts)) == ts)
val ts = DateTimeUtils.toJavaTimestamp(t)
assert(DateTimeUtils.fromJavaTimestamp(ts) === t)
assert(DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromJavaTimestamp(ts)) === ts)
}
}

test("100ns and julian day") {
val (d, ns) = DateTimeUtils.toJulianDay(0)
assert(d === DateTimeUtils.JULIAN_DAY_OF_EPOCH)
assert(ns === DateTimeUtils.SECONDS_PER_DAY / 2 * DateTimeUtils.NANOS_PER_SECOND)
assert(DateTimeUtils.fromJulianDay(d, ns) == 0L)

val t = new Timestamp(61394778610000L) // (2015, 6, 11, 10, 10, 10, 100)
val (d1, ns1) = DateTimeUtils.toJulianDay(DateTimeUtils.fromJavaTimestamp(t))
val t2 = DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromJulianDay(d1, ns1))
assert(t.equals(t2))
}
}
5 changes: 0 additions & 5 deletions sql/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,6 @@
<artifactId>jackson-databind</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>
<dependency>
<groupId>org.jodd</groupId>
<artifactId>jodd-core</artifactId>
<version>${jodd.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -148,8 +148,8 @@ object EvaluatePython {

case (ud, udt: UserDefinedType[_]) => toJava(udt.serialize(ud), udt.sqlType)

case (date: Int, DateType) => DateUtils.toJavaDate(date)
case (t: Long, TimestampType) => DateUtils.toJavaTimestamp(t)
case (date: Int, DateType) => DateTimeUtils.toJavaDate(date)
case (t: Long, TimestampType) => DateTimeUtils.toJavaTimestamp(t)
case (s: UTF8String, StringType) => s.toString

// Pyrolite can handle Timestamp and Decimal
Expand Down Expand Up @@ -188,12 +188,12 @@ object EvaluatePython {
}): Row

case (c: java.util.Calendar, DateType) =>
DateUtils.fromJavaDate(new java.sql.Date(c.getTimeInMillis))
DateTimeUtils.fromJavaDate(new java.sql.Date(c.getTimeInMillis))

case (c: java.util.Calendar, TimestampType) =>
c.getTimeInMillis * 10000L
case (t: java.sql.Timestamp, TimestampType) =>
DateUtils.fromJavaTimestamp(t)
DateTimeUtils.fromJavaTimestamp(t)

case (_, udt: UserDefinedType[_]) =>
fromJava(obj, udt.sqlType)
Expand Down
Loading

0 comments on commit 6b7f2ce

Please sign in to comment.