Skip to content

Commit

Permalink
improve timestamp from parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Jun 11, 2015
1 parent e84545f commit ea196d4
Show file tree
Hide file tree
Showing 18 changed files with 91 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import javax.annotation.Nullable
import scala.collection.mutable.HashMap

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -269,18 +269,18 @@ object CatalystTypeConverters {
}

private object DateConverter extends CatalystTypeConverter[Date, Date, Any] {
override def toCatalystImpl(scalaValue: Date): Int = DateUtils.fromJavaDate(scalaValue)
override def toCatalystImpl(scalaValue: Date): Int = DateTimeUtils.fromJavaDate(scalaValue)
override def toScala(catalystValue: Any): Date =
if (catalystValue == null) null else DateUtils.toJavaDate(catalystValue.asInstanceOf[Int])
if (catalystValue == null) null else DateTimeUtils.toJavaDate(catalystValue.asInstanceOf[Int])
override def toScalaImpl(row: Row, column: Int): Date = toScala(row.getInt(column))
}

private object TimestampConverter extends CatalystTypeConverter[Timestamp, Timestamp, Any] {
override def toCatalystImpl(scalaValue: Timestamp): Long =
DateUtils.fromJavaTimestamp(scalaValue)
DateTimeUtils.fromJavaTimestamp(scalaValue)
override def toScala(catalystValue: Any): Timestamp =
if (catalystValue == null) null
else DateUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long])
else DateTimeUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long])
override def toScalaImpl(row: Row, column: Int): Timestamp = toScala(row.getLong(column))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.text.{DateFormat, SimpleDateFormat}

import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._

/** Cast the child expression to the target data type. */
Expand Down Expand Up @@ -112,9 +112,9 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
// UDFToString
private[this] def castToString(from: DataType): Any => Any = from match {
case BinaryType => buildCast[Array[Byte]](_, UTF8String(_))
case DateType => buildCast[Int](_, d => UTF8String(DateUtils.toString(d)))
case DateType => buildCast[Int](_, d => UTF8String(DateTimeUtils.toString(d)))
case TimestampType => buildCast[Long](_,
t => UTF8String(timestampToString(DateUtils.toJavaTimestamp(t))))
t => UTF8String(timestampToString(DateTimeUtils.toJavaTimestamp(t))))
case _ => buildCast[Any](_, o => UTF8String(o.toString))
}

Expand Down Expand Up @@ -159,7 +159,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
if (periodIdx != -1 && n.length() - periodIdx > 9) {
n = n.substring(0, periodIdx + 10)
}
try DateUtils.fromJavaTimestamp(Timestamp.valueOf(n))
try DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(n))
catch { case _: java.lang.IllegalArgumentException => null }
})
case BooleanType =>
Expand All @@ -173,7 +173,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case ByteType =>
buildCast[Byte](_, b => longToTimestamp(b.toLong))
case DateType =>
buildCast[Int](_, d => DateUtils.toMillisSinceEpoch(d) * 10000)
buildCast[Int](_, d => DateTimeUtils.toMillisSinceEpoch(d) * 10000)
// TimestampWritable.decimalToTimestamp
case DecimalType() =>
buildCast[Decimal](_, d => decimalToTimestamp(d))
Expand Down Expand Up @@ -222,13 +222,13 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
private[this] def castToDate(from: DataType): Any => Any = from match {
case StringType =>
buildCast[UTF8String](_, s =>
try DateUtils.fromJavaDate(Date.valueOf(s.toString))
try DateTimeUtils.fromJavaDate(Date.valueOf(s.toString))
catch { case _: java.lang.IllegalArgumentException => null }
)
case TimestampType =>
// throw valid precision more than seconds, according to Hive.
// Timestamp.nanos is in 0 to 999,999,999, no more than a second.
buildCast[Long](_, t => DateUtils.millisToDays(t / 10000L))
buildCast[Long](_, t => DateTimeUtils.millisToDays(t / 10000L))
// Hive throws this exception as a Semantic Exception
// It is never possible to compare result when hive return with exception,
// so we can return null
Expand Down Expand Up @@ -439,7 +439,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case (DateType, StringType) =>
defineCodeGen(ctx, ev, c =>
s"""new ${ctx.stringType}().set(
org.apache.spark.sql.catalyst.util.DateUtils.toString($c))""")
org.apache.spark.sql.catalyst.util.DateTimeUtils.toString($c))""")
// Special handling required for timestamps in hive test cases since the toString function
// does not match the expected output.
case (TimestampType, StringType) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.sql.{Date, Timestamp}

import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode}
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._

object Literal {
Expand All @@ -37,8 +37,8 @@ object Literal {
case d: BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
case d: java.math.BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
case d: Decimal => Literal(d, DecimalType.Unlimited)
case t: Timestamp => Literal(DateUtils.fromJavaTimestamp(t), TimestampType)
case d: Date => Literal(DateUtils.fromJavaDate(d), DateType)
case t: Timestamp => Literal(DateTimeUtils.fromJavaTimestamp(t), TimestampType)
case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType)
case a: Array[Byte] => Literal(a, BinaryType)
case null => Literal(null, NullType)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.Cast
/**
* Helper function to convert between Int value of days since 1970-01-01 and java.sql.Date
*/
object DateUtils {
object DateTimeUtils {
private val MILLIS_PER_DAY = 86400000
private val HUNDRED_NANOS_PER_SECOND = 10000000L

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
import java.sql.{Timestamp, Date}

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -138,7 +138,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(cast(cast(sd, DateType), StringType), sd)
checkEvaluation(cast(cast(d, StringType), DateType), 0)
checkEvaluation(cast(cast(nts, TimestampType), StringType), nts)
checkEvaluation(cast(cast(ts, StringType), TimestampType), DateUtils.fromJavaTimestamp(ts))
checkEvaluation(cast(cast(ts, StringType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts))

// all convert to string type to check
checkEvaluation(cast(cast(cast(nts, TimestampType), DateType), StringType), sd)
Expand Down Expand Up @@ -270,9 +270,9 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(cast(ts, LongType), 15.toLong)
checkEvaluation(cast(ts, FloatType), 15.002f)
checkEvaluation(cast(ts, DoubleType), 15.002)
checkEvaluation(cast(cast(tss, ShortType), TimestampType), DateUtils.fromJavaTimestamp(ts))
checkEvaluation(cast(cast(tss, IntegerType), TimestampType), DateUtils.fromJavaTimestamp(ts))
checkEvaluation(cast(cast(tss, LongType), TimestampType), DateUtils.fromJavaTimestamp(ts))
checkEvaluation(cast(cast(tss, ShortType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts))
checkEvaluation(cast(cast(tss, IntegerType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts))
checkEvaluation(cast(cast(tss, LongType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts))
checkEvaluation(
cast(cast(millis.toFloat / 1000, TimestampType), FloatType),
millis.toFloat / 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.immutable.HashSet

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types.{IntegerType, BooleanType}


Expand Down Expand Up @@ -167,8 +167,8 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(Literal(true) <=> Literal.create(null, BooleanType), false, row)
checkEvaluation(Literal.create(null, BooleanType) <=> Literal(true), false, row)

val d1 = DateUtils.fromJavaDate(Date.valueOf("1970-01-01"))
val d2 = DateUtils.fromJavaDate(Date.valueOf("1970-01-02"))
val d1 = DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-01"))
val d2 = DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-02"))
checkEvaluation(Literal(d1) < Literal(d2), true)

val ts1 = new Timestamp(12)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ class DateUtilsSuite extends SparkFunSuite {
test("timestamp") {
val now = new Timestamp(System.currentTimeMillis())
now.setNanos(100)
val ns = DateUtils.fromJavaTimestamp(now)
val ns = DateTimeUtils.fromJavaTimestamp(now)
assert(ns % 10000000L == 1)
assert(DateUtils.toJavaTimestamp(ns) == now)
assert(DateTimeUtils.toJavaTimestamp(ns) == now)

List(-111111111111L, -1L, 0, 1L, 111111111111L).foreach { t =>
val ts = DateUtils.toJavaTimestamp(t)
assert(DateUtils.fromJavaTimestamp(ts) == t)
assert(DateUtils.toJavaTimestamp(DateUtils.fromJavaTimestamp(ts)) == ts)
val ts = DateTimeUtils.toJavaTimestamp(t)
assert(DateTimeUtils.fromJavaTimestamp(ts) == t)
assert(DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromJavaTimestamp(ts)) == ts)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.{Accumulator, Logging => SparkLogging}

Expand Down Expand Up @@ -147,8 +147,8 @@ object EvaluatePython {

case (ud, udt: UserDefinedType[_]) => toJava(udt.serialize(ud), udt.sqlType)

case (date: Int, DateType) => DateUtils.toJavaDate(date)
case (t: Long, TimestampType) => DateUtils.toJavaTimestamp(t)
case (date: Int, DateType) => DateTimeUtils.toJavaDate(date)
case (t: Long, TimestampType) => DateTimeUtils.toJavaTimestamp(t)
case (s: UTF8String, StringType) => s.toString

// Pyrolite can handle Timestamp and Decimal
Expand Down Expand Up @@ -187,12 +187,12 @@ object EvaluatePython {
}): Row

case (c: java.util.Calendar, DateType) =>
DateUtils.fromJavaDate(new java.sql.Date(c.getTimeInMillis))
DateTimeUtils.fromJavaDate(new java.sql.Date(c.getTimeInMillis))

case (c: java.util.Calendar, TimestampType) =>
c.getTimeInMillis * 10000L
case (t: java.sql.Timestamp, TimestampType) =>
DateUtils.fromJavaTimestamp(t)
DateTimeUtils.fromJavaTimestamp(t)

case (_, udt: UserDefinedType[_]) =>
fromJava(obj, udt.sqlType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Row, SpecificMutableRow}
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.sources._

Expand Down Expand Up @@ -382,10 +382,10 @@ private[sql] class JDBCRDD(
conversions(i) match {
case BooleanConversion => mutableRow.setBoolean(i, rs.getBoolean(pos))
case DateConversion =>
// DateUtils.fromJavaDate does not handle null value, so we need to check it.
// DateTimeUtils.fromJavaDate does not handle null value, so we need to check it.
val dateVal = rs.getDate(pos)
if (dateVal != null) {
mutableRow.setInt(i, DateUtils.fromJavaDate(dateVal))
mutableRow.setInt(i, DateTimeUtils.fromJavaDate(dateVal))
} else {
mutableRow.update(i, null)
}
Expand Down Expand Up @@ -420,7 +420,7 @@ private[sql] class JDBCRDD(
case TimestampConversion =>
val t = rs.getTimestamp(pos)
if (t != null) {
mutableRow.setLong(i, DateUtils.fromJavaTimestamp(t))
mutableRow.setLong(i, DateTimeUtils.fromJavaTimestamp(t))
} else {
mutableRow.update(i, null)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.fasterxml.jackson.core._

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -61,10 +61,10 @@ private[sql] object JacksonParser {
null

case (VALUE_STRING, DateType) =>
DateUtils.millisToDays(DateUtils.stringToTime(parser.getText).getTime)
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)

case (VALUE_STRING, TimestampType) =>
DateUtils.stringToTime(parser.getText).getTime * 10000L
DateTimeUtils.stringToTime(parser.getText).getTime * 10000L

case (VALUE_NUMBER_INT, TimestampType) =>
parser.getLongValue * 10000L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._

private[sql] object JsonRDD extends Logging {
Expand Down Expand Up @@ -391,16 +391,16 @@ private[sql] object JsonRDD extends Logging {
value match {
// only support string as date
case value: java.lang.String =>
DateUtils.millisToDays(DateUtils.stringToTime(value).getTime)
case value: java.sql.Date => DateUtils.fromJavaDate(value)
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(value).getTime)
case value: java.sql.Date => DateTimeUtils.fromJavaDate(value)
}
}

private def toTimestamp(value: Any): Long = {
value match {
case value: java.lang.Integer => value.asInstanceOf[Int].toLong * 10000L
case value: java.lang.Long => value * 10000L
case value: java.lang.String => DateUtils.stringToTime(value).getTime * 10000L
case value: java.lang.String => DateTimeUtils.stringToTime(value).getTime * 10000L
}
}

Expand Down
Loading

0 comments on commit ea196d4

Please sign in to comment.