From 6b7f2ceafdcbb014791909747c2210b527305df9 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 22 Jun 2015 18:03:59 -0700 Subject: [PATCH] [SPARK-8307] [SQL] improve timestamp from parquet This PR change to convert julian day to unix timestamp directly (without Calendar and Timestamp). cc adrian-wang rxin Author: Davies Liu Closes #6759 from davies/improve_ts and squashes the following commits: 849e301 [Davies Liu] Merge branch 'master' of github.com:apache/spark into improve_ts b0e4cad [Davies Liu] Merge branch 'master' of github.com:apache/spark into improve_ts 8e2d56f [Davies Liu] address comments 634b9f5 [Davies Liu] fix mima 4891efb [Davies Liu] address comment bfc437c [Davies Liu] fix build ae5979c [Davies Liu] Merge branch 'master' of github.com:apache/spark into improve_ts 602b969 [Davies Liu] remove jodd 2f2e48c [Davies Liu] fix test 8ace611 [Davies Liu] fix mima 212143b [Davies Liu] fix mina c834108 [Davies Liu] Merge branch 'master' of github.com:apache/spark into improve_ts a3171b8 [Davies Liu] Merge branch 'master' of github.com:apache/spark into improve_ts 5233974 [Davies Liu] fix scala style 361fd62 [Davies Liu] address comments ea196d4 [Davies Liu] improve timestamp from parquet --- pom.xml | 1 - project/MimaExcludes.scala | 12 ++- .../sql/catalyst/CatalystTypeConverters.scala | 14 +-- .../spark/sql/catalyst/expressions/Cast.scala | 16 ++-- .../sql/catalyst/expressions/literals.scala | 6 +- .../{DateUtils.scala => DateTimeUtils.scala} | 41 +++++++-- .../sql/catalyst/expressions/CastSuite.scala | 11 +-- .../catalyst/expressions/PredicateSuite.scala | 6 +- .../expressions/UnsafeRowConverterSuite.scala | 10 +-- ...lsSuite.scala => DateTimeUtilsSuite.scala} | 28 ++++-- sql/core/pom.xml | 5 -- .../spark/sql/execution/pythonUdfs.scala | 10 +-- .../org/apache/spark/sql/jdbc/JDBCRDD.scala | 12 +-- .../apache/spark/sql/json/JacksonParser.scala | 6 +- .../org/apache/spark/sql/json/JsonRDD.scala | 8 +- .../spark/sql/parquet/ParquetConverter.scala | 86 +++---------------- .../sql/parquet/ParquetTableSupport.scala | 19 ++-- .../sql/parquet/timestamp/NanoTime.scala | 69 --------------- .../org/apache/spark/sql/json/JsonSuite.scala | 20 +++-- .../spark/sql/parquet/ParquetIOSuite.scala | 4 +- .../spark/sql/sources/TableScanSuite.scala | 11 ++- .../spark/sql/hive/HiveInspectors.scala | 20 ++--- .../apache/spark/sql/hive/TableReader.scala | 8 +- .../spark/sql/hive/hiveWriterContainers.scala | 4 +- 24 files changed, 175 insertions(+), 252 deletions(-) rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/{DateUtils.scala => DateTimeUtils.scala} (68%) rename sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/{DateUtilsSuite.scala => DateTimeUtilsSuite.scala} (52%) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala diff --git a/pom.xml b/pom.xml index 6d4f717d4931b..80cacb5ace2d4 100644 --- a/pom.xml +++ b/pom.xml @@ -156,7 +156,6 @@ 2.10 ${scala.version} org.scala-lang - 3.6.3 1.9.13 2.4.4 1.1.1.7 diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 015d0296dd369..7a748fb5e38bd 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -54,7 +54,17 @@ object MimaExcludes { ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.streaming.kafka.KafkaTestUtils.waitUntilLeaderOffset"), // SQL execution is considered private. - excludePackage("org.apache.spark.sql.execution") + excludePackage("org.apache.spark.sql.execution"), + // NanoTime and CatalystTimestampConverter is only used inside catalyst, + // not needed anymore + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.parquet.timestamp.NanoTime"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.parquet.timestamp.NanoTime$"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.parquet.CatalystTimestampConverter"), + ProblemFilters.exclude[MissingClassProblem]( + "org.apache.spark.sql.parquet.CatalystTimestampConverter$") ) case v if v.startsWith("1.4") => Seq( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index 620e8de83a96c..429fc4077be9a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -19,15 +19,15 @@ package org.apache.spark.sql.catalyst import java.lang.{Iterable => JavaIterable} import java.math.{BigDecimal => JavaBigDecimal} -import java.sql.{Timestamp, Date} +import java.sql.{Date, Timestamp} import java.util.{Map => JavaMap} import javax.annotation.Nullable import scala.collection.mutable.HashMap -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -272,18 +272,18 @@ object CatalystTypeConverters { } private object DateConverter extends CatalystTypeConverter[Date, Date, Any] { - override def toCatalystImpl(scalaValue: Date): Int = DateUtils.fromJavaDate(scalaValue) + override def toCatalystImpl(scalaValue: Date): Int = DateTimeUtils.fromJavaDate(scalaValue) override def toScala(catalystValue: Any): Date = - if (catalystValue == null) null else DateUtils.toJavaDate(catalystValue.asInstanceOf[Int]) + if (catalystValue == null) null else DateTimeUtils.toJavaDate(catalystValue.asInstanceOf[Int]) override def toScalaImpl(row: InternalRow, column: Int): Date = toScala(row.getInt(column)) } private object TimestampConverter extends CatalystTypeConverter[Timestamp, Timestamp, Any] { override def toCatalystImpl(scalaValue: Timestamp): Long = - DateUtils.fromJavaTimestamp(scalaValue) + DateTimeUtils.fromJavaTimestamp(scalaValue) override def toScala(catalystValue: Any): Timestamp = if (catalystValue == null) null - else DateUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long]) + else DateTimeUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long]) override def toScalaImpl(row: InternalRow, column: Int): Timestamp = toScala(row.getLong(column)) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index ad920f287820c..d271434a306dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -24,7 +24,7 @@ import java.text.{DateFormat, SimpleDateFormat} import org.apache.spark.Logging import org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode} -import org.apache.spark.sql.catalyst.util.DateUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -115,9 +115,9 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w // UDFToString private[this] def castToString(from: DataType): Any => Any = from match { case BinaryType => buildCast[Array[Byte]](_, UTF8String.fromBytes) - case DateType => buildCast[Int](_, d => UTF8String.fromString(DateUtils.toString(d))) + case DateType => buildCast[Int](_, d => UTF8String.fromString(DateTimeUtils.toString(d))) case TimestampType => buildCast[Long](_, - t => UTF8String.fromString(timestampToString(DateUtils.toJavaTimestamp(t)))) + t => UTF8String.fromString(timestampToString(DateTimeUtils.toJavaTimestamp(t)))) case _ => buildCast[Any](_, o => UTF8String.fromString(o.toString)) } @@ -162,7 +162,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w if (periodIdx != -1 && n.length() - periodIdx > 9) { n = n.substring(0, periodIdx + 10) } - try DateUtils.fromJavaTimestamp(Timestamp.valueOf(n)) + try DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(n)) catch { case _: java.lang.IllegalArgumentException => null } }) case BooleanType => @@ -176,7 +176,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case ByteType => buildCast[Byte](_, b => longToTimestamp(b.toLong)) case DateType => - buildCast[Int](_, d => DateUtils.toMillisSinceEpoch(d) * 10000) + buildCast[Int](_, d => DateTimeUtils.toMillisSinceEpoch(d) * 10000) // TimestampWritable.decimalToTimestamp case DecimalType() => buildCast[Decimal](_, d => decimalToTimestamp(d)) @@ -225,13 +225,13 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w private[this] def castToDate(from: DataType): Any => Any = from match { case StringType => buildCast[UTF8String](_, s => - try DateUtils.fromJavaDate(Date.valueOf(s.toString)) + try DateTimeUtils.fromJavaDate(Date.valueOf(s.toString)) catch { case _: java.lang.IllegalArgumentException => null } ) case TimestampType => // throw valid precision more than seconds, according to Hive. // Timestamp.nanos is in 0 to 999,999,999, no more than a second. - buildCast[Long](_, t => DateUtils.millisToDays(t / 10000L)) + buildCast[Long](_, t => DateTimeUtils.millisToDays(t / 10000L)) // Hive throws this exception as a Semantic Exception // It is never possible to compare result when hive return with exception, // so we can return null @@ -442,7 +442,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w case (DateType, StringType) => defineCodeGen(ctx, ev, c => s"""${ctx.stringType}.fromString( - org.apache.spark.sql.catalyst.util.DateUtils.toString($c))""") + org.apache.spark.sql.catalyst.util.DateTimeUtils.toString($c))""") // Special handling required for timestamps in hive test cases since the toString function // does not match the expected output. case (TimestampType, StringType) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index 6c86a47ba200c..479224af5627a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp} import org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.CatalystTypeConverters import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode} -import org.apache.spark.sql.catalyst.util.DateUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -39,8 +39,8 @@ object Literal { case d: BigDecimal => Literal(Decimal(d), DecimalType.Unlimited) case d: java.math.BigDecimal => Literal(Decimal(d), DecimalType.Unlimited) case d: Decimal => Literal(d, DecimalType.Unlimited) - case t: Timestamp => Literal(DateUtils.fromJavaTimestamp(t), TimestampType) - case d: Date => Literal(DateUtils.fromJavaDate(d), DateType) + case t: Timestamp => Literal(DateTimeUtils.fromJavaTimestamp(t), TimestampType) + case d: Date => Literal(DateTimeUtils.fromJavaDate(d), DateType) case a: Array[Byte] => Literal(a, BinaryType) case null => Literal(null, NullType) case _ => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala similarity index 68% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 5cadc141af1df..ff79884a44d00 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -17,18 +17,28 @@ package org.apache.spark.sql.catalyst.util -import java.sql.{Timestamp, Date} +import java.sql.{Date, Timestamp} import java.text.SimpleDateFormat import java.util.{Calendar, TimeZone} import org.apache.spark.sql.catalyst.expressions.Cast /** - * Helper function to convert between Int value of days since 1970-01-01 and java.sql.Date + * Helper functions for converting between internal and external date and time representations. + * Dates are exposed externally as java.sql.Date and are represented internally as the number of + * dates since the Unix epoch (1970-01-01). Timestamps are exposed externally as java.sql.Timestamp + * and are stored internally as longs, which are capable of storing timestamps with 100 nanosecond + * precision. */ -object DateUtils { - private val MILLIS_PER_DAY = 86400000 - private val HUNDRED_NANOS_PER_SECOND = 10000000L +object DateTimeUtils { + final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L + + // see http://stackoverflow.com/questions/466321/convert-unix-timestamp-to-julian + final val JULIAN_DAY_OF_EPOCH = 2440587 // and .5 + final val SECONDS_PER_DAY = 60 * 60 * 24L + final val HUNDRED_NANOS_PER_SECOND = 1000L * 1000L * 10L + final val NANOS_PER_SECOND = HUNDRED_NANOS_PER_SECOND * 100 + // Java TimeZone has no mention of thread safety. Use thread local instance to be safe. private val LOCAL_TIMEZONE = new ThreadLocal[TimeZone] { @@ -117,4 +127,25 @@ object DateUtils { 0L } } + + /** + * Return the number of 100ns (hundred of nanoseconds) since epoch from Julian day + * and nanoseconds in a day + */ + def fromJulianDay(day: Int, nanoseconds: Long): Long = { + // use Long to avoid rounding errors + val seconds = (day - JULIAN_DAY_OF_EPOCH).toLong * SECONDS_PER_DAY - SECONDS_PER_DAY / 2 + seconds * HUNDRED_NANOS_PER_SECOND + nanoseconds / 100L + } + + /** + * Return Julian day and nanoseconds in a day from the number of 100ns (hundred of nanoseconds) + */ + def toJulianDay(num100ns: Long): (Int, Long) = { + val seconds = num100ns / HUNDRED_NANOS_PER_SECOND + SECONDS_PER_DAY / 2 + val day = seconds / SECONDS_PER_DAY + JULIAN_DAY_OF_EPOCH + val secondsInDay = seconds % SECONDS_PER_DAY + val nanos = (num100ns % HUNDRED_NANOS_PER_SECOND) * 100L + (day.toInt, secondsInDay * NANOS_PER_SECOND + nanos) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala index e407f6f166e86..f3809be722a84 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions import java.sql.{Timestamp, Date} import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.util.DateUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ /** @@ -156,7 +156,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(cast(cast(sd, DateType), StringType), sd) checkEvaluation(cast(cast(d, StringType), DateType), 0) checkEvaluation(cast(cast(nts, TimestampType), StringType), nts) - checkEvaluation(cast(cast(ts, StringType), TimestampType), DateUtils.fromJavaTimestamp(ts)) + checkEvaluation(cast(cast(ts, StringType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts)) // all convert to string type to check checkEvaluation(cast(cast(cast(nts, TimestampType), DateType), StringType), sd) @@ -301,9 +301,10 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(cast(ts, LongType), 15.toLong) checkEvaluation(cast(ts, FloatType), 15.002f) checkEvaluation(cast(ts, DoubleType), 15.002) - checkEvaluation(cast(cast(tss, ShortType), TimestampType), DateUtils.fromJavaTimestamp(ts)) - checkEvaluation(cast(cast(tss, IntegerType), TimestampType), DateUtils.fromJavaTimestamp(ts)) - checkEvaluation(cast(cast(tss, LongType), TimestampType), DateUtils.fromJavaTimestamp(ts)) + checkEvaluation(cast(cast(tss, ShortType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts)) + checkEvaluation(cast(cast(tss, IntegerType), TimestampType), + DateTimeUtils.fromJavaTimestamp(ts)) + checkEvaluation(cast(cast(tss, LongType), TimestampType), DateTimeUtils.fromJavaTimestamp(ts)) checkEvaluation( cast(cast(millis.toFloat / 1000, TimestampType), FloatType), millis.toFloat / 1000) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala index b6261bfba0786..72fec3b86e5e4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/PredicateSuite.scala @@ -23,7 +23,7 @@ import scala.collection.immutable.HashSet import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.util.DateUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types.{IntegerType, BooleanType} @@ -167,8 +167,8 @@ class PredicateSuite extends SparkFunSuite with ExpressionEvalHelper { checkEvaluation(Literal(true) <=> Literal.create(null, BooleanType), false, row) checkEvaluation(Literal.create(null, BooleanType) <=> Literal(true), false, row) - val d1 = DateUtils.fromJavaDate(Date.valueOf("1970-01-01")) - val d2 = DateUtils.fromJavaDate(Date.valueOf("1970-01-02")) + val d1 = DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-01")) + val d2 = DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-02")) checkEvaluation(Literal(d1) < Literal(d2), true) val ts1 = new Timestamp(12) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala index d8f3351d6dff6..c0675f4f4dff6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala @@ -23,7 +23,7 @@ import java.util.Arrays import org.scalatest.Matchers import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.util.DateUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.PlatformDependent import org.apache.spark.unsafe.array.ByteArrayMethods @@ -83,8 +83,8 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers { val row = new SpecificMutableRow(fieldTypes) row.setLong(0, 0) row.setString(1, "Hello") - row.update(2, DateUtils.fromJavaDate(Date.valueOf("1970-01-01"))) - row.update(3, DateUtils.fromJavaTimestamp(Timestamp.valueOf("2015-05-08 08:10:25"))) + row.update(2, DateTimeUtils.fromJavaDate(Date.valueOf("1970-01-01"))) + row.update(3, DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-05-08 08:10:25"))) val sizeRequired: Int = converter.getSizeRequirement(row) sizeRequired should be (8 + (8 * 4) + @@ -98,9 +98,9 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers { unsafeRow.getLong(0) should be (0) unsafeRow.getString(1) should be ("Hello") // Date is represented as Int in unsafeRow - DateUtils.toJavaDate(unsafeRow.getInt(2)) should be (Date.valueOf("1970-01-01")) + DateTimeUtils.toJavaDate(unsafeRow.getInt(2)) should be (Date.valueOf("1970-01-01")) // Timestamp is represented as Long in unsafeRow - DateUtils.toJavaTimestamp(unsafeRow.getLong(3)) should be + DateTimeUtils.toJavaTimestamp(unsafeRow.getLong(3)) should be (Timestamp.valueOf("2015-05-08 08:10:25")) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala similarity index 52% rename from sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala rename to sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index 4d8fe4ac5e78f..03eb64f097a37 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -21,19 +21,31 @@ import java.sql.Timestamp import org.apache.spark.SparkFunSuite -class DateUtilsSuite extends SparkFunSuite { +class DateTimeUtilsSuite extends SparkFunSuite { - test("timestamp") { + test("timestamp and 100ns") { val now = new Timestamp(System.currentTimeMillis()) now.setNanos(100) - val ns = DateUtils.fromJavaTimestamp(now) - assert(ns % 10000000L == 1) - assert(DateUtils.toJavaTimestamp(ns) == now) + val ns = DateTimeUtils.fromJavaTimestamp(now) + assert(ns % 10000000L === 1) + assert(DateTimeUtils.toJavaTimestamp(ns) === now) List(-111111111111L, -1L, 0, 1L, 111111111111L).foreach { t => - val ts = DateUtils.toJavaTimestamp(t) - assert(DateUtils.fromJavaTimestamp(ts) == t) - assert(DateUtils.toJavaTimestamp(DateUtils.fromJavaTimestamp(ts)) == ts) + val ts = DateTimeUtils.toJavaTimestamp(t) + assert(DateTimeUtils.fromJavaTimestamp(ts) === t) + assert(DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromJavaTimestamp(ts)) === ts) } } + + test("100ns and julian day") { + val (d, ns) = DateTimeUtils.toJulianDay(0) + assert(d === DateTimeUtils.JULIAN_DAY_OF_EPOCH) + assert(ns === DateTimeUtils.SECONDS_PER_DAY / 2 * DateTimeUtils.NANOS_PER_SECOND) + assert(DateTimeUtils.fromJulianDay(d, ns) == 0L) + + val t = new Timestamp(61394778610000L) // (2015, 6, 11, 10, 10, 10, 100) + val (d1, ns1) = DateTimeUtils.toJulianDay(DateTimeUtils.fromJavaTimestamp(t)) + val t2 = DateTimeUtils.toJavaTimestamp(DateTimeUtils.fromJulianDay(d1, ns1)) + assert(t.equals(t2)) + } } diff --git a/sql/core/pom.xml b/sql/core/pom.xml index ed75475a87067..8fc16928adbd9 100644 --- a/sql/core/pom.xml +++ b/sql/core/pom.xml @@ -73,11 +73,6 @@ jackson-databind ${fasterxml.jackson.version} - - org.jodd - jodd-core - ${jodd.version} - junit junit diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala index c8c67ce334002..6db551c543a9c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.util.DateUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -148,8 +148,8 @@ object EvaluatePython { case (ud, udt: UserDefinedType[_]) => toJava(udt.serialize(ud), udt.sqlType) - case (date: Int, DateType) => DateUtils.toJavaDate(date) - case (t: Long, TimestampType) => DateUtils.toJavaTimestamp(t) + case (date: Int, DateType) => DateTimeUtils.toJavaDate(date) + case (t: Long, TimestampType) => DateTimeUtils.toJavaTimestamp(t) case (s: UTF8String, StringType) => s.toString // Pyrolite can handle Timestamp and Decimal @@ -188,12 +188,12 @@ object EvaluatePython { }): Row case (c: java.util.Calendar, DateType) => - DateUtils.fromJavaDate(new java.sql.Date(c.getTimeInMillis)) + DateTimeUtils.fromJavaDate(new java.sql.Date(c.getTimeInMillis)) case (c: java.util.Calendar, TimestampType) => c.getTimeInMillis * 10000L case (t: java.sql.Timestamp, TimestampType) => - DateUtils.fromJavaTimestamp(t) + DateTimeUtils.fromJavaTimestamp(t) case (_, udt: UserDefinedType[_]) => fromJava(obj, udt.sqlType) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala index 226b143923df6..8b4276b2c364c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala @@ -22,13 +22,13 @@ import java.util.Properties import org.apache.commons.lang3.StringUtils -import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{InternalRow, SpecificMutableRow} -import org.apache.spark.sql.catalyst.util.DateUtils -import org.apache.spark.sql.types._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} /** * Data corresponding to one partition of a JDBCRDD. @@ -383,10 +383,10 @@ private[sql] class JDBCRDD( conversions(i) match { case BooleanConversion => mutableRow.setBoolean(i, rs.getBoolean(pos)) case DateConversion => - // DateUtils.fromJavaDate does not handle null value, so we need to check it. + // DateTimeUtils.fromJavaDate does not handle null value, so we need to check it. val dateVal = rs.getDate(pos) if (dateVal != null) { - mutableRow.setInt(i, DateUtils.fromJavaDate(dateVal)) + mutableRow.setInt(i, DateTimeUtils.fromJavaDate(dateVal)) } else { mutableRow.update(i, null) } @@ -421,7 +421,7 @@ private[sql] class JDBCRDD( case TimestampConversion => val t = rs.getTimestamp(pos) if (t != null) { - mutableRow.setLong(i, DateUtils.fromJavaTimestamp(t)) + mutableRow.setLong(i, DateTimeUtils.fromJavaTimestamp(t)) } else { mutableRow.update(i, null) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala index 817e8a20b34de..6222addc9aa3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala @@ -25,7 +25,7 @@ import com.fasterxml.jackson.core._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util.DateUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.json.JacksonUtils.nextUntil import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -63,10 +63,10 @@ private[sql] object JacksonParser { null case (VALUE_STRING, DateType) => - DateUtils.millisToDays(DateUtils.stringToTime(parser.getText).getTime) + DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime) case (VALUE_STRING, TimestampType) => - DateUtils.stringToTime(parser.getText).getTime * 10000L + DateTimeUtils.stringToTime(parser.getText).getTime * 10000L case (VALUE_NUMBER_INT, TimestampType) => parser.getLongValue * 10000L diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index 44594c5080ff4..73d9520d6f53f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -28,7 +28,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util.DateUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -393,8 +393,8 @@ private[sql] object JsonRDD extends Logging { value match { // only support string as date case value: java.lang.String => - DateUtils.millisToDays(DateUtils.stringToTime(value).getTime) - case value: java.sql.Date => DateUtils.fromJavaDate(value) + DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(value).getTime) + case value: java.sql.Date => DateTimeUtils.fromJavaDate(value) } } @@ -402,7 +402,7 @@ private[sql] object JsonRDD extends Logging { value match { case value: java.lang.Integer => value.asInstanceOf[Int].toLong * 10000L case value: java.lang.Long => value * 10000L - case value: java.lang.String => DateUtils.stringToTime(value).getTime * 10000L + case value: java.lang.String => DateTimeUtils.stringToTime(value).getTime * 10000L } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala index 4da5e96b82e3d..cf7aa44e4cd55 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala @@ -17,21 +17,19 @@ package org.apache.spark.sql.parquet -import java.sql.Timestamp -import java.util.{TimeZone, Calendar} +import java.nio.ByteOrder -import scala.collection.mutable.{Buffer, ArrayBuffer, HashMap} +import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap} -import jodd.datetime.JDateTime +import org.apache.parquet.Preconditions import org.apache.parquet.column.Dictionary -import org.apache.parquet.io.api.{PrimitiveConverter, GroupConverter, Binary, Converter} +import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} import org.apache.parquet.schema.MessageType import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util.DateUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.parquet.CatalystConverter.FieldType import org.apache.spark.sql.types._ -import org.apache.spark.sql.parquet.timestamp.NanoTime import org.apache.spark.unsafe.types.UTF8String /** @@ -269,7 +267,12 @@ private[parquet] abstract class CatalystConverter extends GroupConverter { * Read a Timestamp value from a Parquet Int96Value */ protected[parquet] def readTimestamp(value: Binary): Long = { - DateUtils.fromJavaTimestamp(CatalystTimestampConverter.convertToTimestamp(value)) + Preconditions.checkArgument(value.length() == 12, "Must be 12 bytes") + val buf = value.toByteBuffer + buf.order(ByteOrder.LITTLE_ENDIAN) + val timeOfDayNanos = buf.getLong + val julianDay = buf.getInt + DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos) } } @@ -498,73 +501,6 @@ private[parquet] object CatalystArrayConverter { val INITIAL_ARRAY_SIZE = 20 } -private[parquet] object CatalystTimestampConverter { - // TODO most part of this comes from Hive-0.14 - // Hive code might have some issues, so we need to keep an eye on it. - // Also we use NanoTime and Int96Values from parquet-examples. - // We utilize jodd to convert between NanoTime and Timestamp - val parquetTsCalendar = new ThreadLocal[Calendar] - def getCalendar: Calendar = { - // this is a cache for the calendar instance. - if (parquetTsCalendar.get == null) { - parquetTsCalendar.set(Calendar.getInstance(TimeZone.getTimeZone("GMT"))) - } - parquetTsCalendar.get - } - val NANOS_PER_SECOND: Long = 1000000000 - val SECONDS_PER_MINUTE: Long = 60 - val MINUTES_PER_HOUR: Long = 60 - val NANOS_PER_MILLI: Long = 1000000 - - def convertToTimestamp(value: Binary): Timestamp = { - val nt = NanoTime.fromBinary(value) - val timeOfDayNanos = nt.getTimeOfDayNanos - val julianDay = nt.getJulianDay - val jDateTime = new JDateTime(julianDay.toDouble) - val calendar = getCalendar - calendar.set(Calendar.YEAR, jDateTime.getYear) - calendar.set(Calendar.MONTH, jDateTime.getMonth - 1) - calendar.set(Calendar.DAY_OF_MONTH, jDateTime.getDay) - - // written in command style - var remainder = timeOfDayNanos - calendar.set( - Calendar.HOUR_OF_DAY, - (remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR)).toInt) - remainder = remainder % (NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR) - calendar.set( - Calendar.MINUTE, (remainder / (NANOS_PER_SECOND * SECONDS_PER_MINUTE)).toInt) - remainder = remainder % (NANOS_PER_SECOND * SECONDS_PER_MINUTE) - calendar.set(Calendar.SECOND, (remainder / NANOS_PER_SECOND).toInt) - val nanos = remainder % NANOS_PER_SECOND - val ts = new Timestamp(calendar.getTimeInMillis) - ts.setNanos(nanos.toInt) - ts - } - - def convertFromTimestamp(ts: Timestamp): Binary = { - val calendar = getCalendar - calendar.setTime(ts) - val jDateTime = new JDateTime(calendar.get(Calendar.YEAR), - calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH)) - // Hive-0.14 didn't set hour before get day number, while the day number should - // has something to do with hour, since julian day number grows at 12h GMT - // here we just follow what hive does. - val julianDay = jDateTime.getJulianDayNumber - - val hour = calendar.get(Calendar.HOUR_OF_DAY) - val minute = calendar.get(Calendar.MINUTE) - val second = calendar.get(Calendar.SECOND) - val nanos = ts.getNanos - // Hive-0.14 would use hours directly, that might be wrong, since the day starts - // from 12h in Julian. here we just follow what hive does. - val nanosOfDay = nanos + second * NANOS_PER_SECOND + - minute * NANOS_PER_SECOND * SECONDS_PER_MINUTE + - hour * NANOS_PER_SECOND * SECONDS_PER_MINUTE * MINUTES_PER_HOUR - NanoTime(julianDay, nanosOfDay).toBinary - } -} - /** * A `parquet.io.api.GroupConverter` that converts a single-element groups that * match the characteristics of an array (see diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index a8775a2a8fd83..e65fa0030e179 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.parquet +import java.nio.{ByteOrder, ByteBuffer} import java.util.{HashMap => JHashMap} import org.apache.hadoop.conf.Configuration @@ -29,7 +30,7 @@ import org.apache.parquet.schema.MessageType import org.apache.spark.Logging import org.apache.spark.sql.catalyst.expressions.{Attribute, InternalRow} -import org.apache.spark.sql.catalyst.util.DateUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -298,7 +299,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Lo } // Scratch array used to write decimals as fixed-length binary - private val scratchBytes = new Array[Byte](8) + private[this] val scratchBytes = new Array[Byte](8) private[parquet] def writeDecimal(decimal: Decimal, precision: Int): Unit = { val numBytes = ParquetTypesConverter.BYTES_FOR_PRECISION(precision) @@ -313,10 +314,16 @@ private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Lo writer.addBinary(Binary.fromByteArray(scratchBytes, 0, numBytes)) } + // array used to write Timestamp as Int96 (fixed-length binary) + private[this] val int96buf = new Array[Byte](12) + private[parquet] def writeTimestamp(ts: Long): Unit = { - val binaryNanoTime = CatalystTimestampConverter.convertFromTimestamp( - DateUtils.toJavaTimestamp(ts)) - writer.addBinary(binaryNanoTime) + val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(ts) + val buf = ByteBuffer.wrap(int96buf) + buf.order(ByteOrder.LITTLE_ENDIAN) + buf.putLong(timeOfDayNanos) + buf.putInt(julianDay) + writer.addBinary(Binary.fromByteArray(int96buf)) } } @@ -360,7 +367,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport { case FloatType => writer.addFloat(record.getFloat(index)) case BooleanType => writer.addBoolean(record.getBoolean(index)) case DateType => writer.addInteger(record.getInt(index)) - case TimestampType => writeTimestamp(record(index).asInstanceOf[Long]) + case TimestampType => writeTimestamp(record.getLong(index)) case d: DecimalType => if (d.precisionInfo == None || d.precisionInfo.get.precision > 18) { sys.error(s"Unsupported datatype $d, cannot write to consumer") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala deleted file mode 100644 index 4d5ed211ad0c0..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/timestamp/NanoTime.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet.timestamp - -import java.nio.{ByteBuffer, ByteOrder} - -import org.apache.parquet.Preconditions -import org.apache.parquet.io.api.{Binary, RecordConsumer} - -private[parquet] class NanoTime extends Serializable { - private var julianDay = 0 - private var timeOfDayNanos = 0L - - def set(julianDay: Int, timeOfDayNanos: Long): this.type = { - this.julianDay = julianDay - this.timeOfDayNanos = timeOfDayNanos - this - } - - def getJulianDay: Int = julianDay - - def getTimeOfDayNanos: Long = timeOfDayNanos - - def toBinary: Binary = { - val buf = ByteBuffer.allocate(12) - buf.order(ByteOrder.LITTLE_ENDIAN) - buf.putLong(timeOfDayNanos) - buf.putInt(julianDay) - buf.flip() - Binary.fromByteBuffer(buf) - } - - def writeValue(recordConsumer: RecordConsumer): Unit = { - recordConsumer.addBinary(toBinary) - } - - override def toString: String = - "NanoTime{julianDay=" + julianDay + ", timeOfDayNanos=" + timeOfDayNanos + "}" -} - -private[sql] object NanoTime { - def fromBinary(bytes: Binary): NanoTime = { - Preconditions.checkArgument(bytes.length() == 12, "Must be 12 bytes") - val buf = bytes.toByteBuffer - buf.order(ByteOrder.LITTLE_ENDIAN) - val timeOfDayNanos = buf.getLong - val julianDay = buf.getInt - new NanoTime().set(julianDay, timeOfDayNanos) - } - - def apply(julianDay: Int, timeOfDayNanos: Long): NanoTime = { - new NanoTime().set(julianDay, timeOfDayNanos) - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala index c32d9f88dd6ee..8204a584179bb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala @@ -25,7 +25,7 @@ import org.scalactic.Tolerance._ import org.apache.spark.sql.{QueryTest, Row, SQLConf} import org.apache.spark.sql.TestData._ -import org.apache.spark.sql.catalyst.util.DateUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.json.InferSchema.compatibleType import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.types._ @@ -76,26 +76,28 @@ class JsonSuite extends QueryTest with TestJsonData { checkTypePromotion( Decimal(doubleNumber), enforceCorrectType(doubleNumber, DecimalType.Unlimited)) - checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(intNumber)), + checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber)), enforceCorrectType(intNumber, TimestampType)) - checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong)), + checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(intNumber.toLong)), enforceCorrectType(intNumber.toLong, TimestampType)) val strTime = "2014-09-30 12:34:56" - checkTypePromotion(DateUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)), + checkTypePromotion(DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)), enforceCorrectType(strTime, TimestampType)) val strDate = "2014-10-15" checkTypePromotion( - DateUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType)) + DateTimeUtils.fromJavaDate(Date.valueOf(strDate)), enforceCorrectType(strDate, DateType)) val ISO8601Time1 = "1970-01-01T01:00:01.0Z" - checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(3601000)), + checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)), enforceCorrectType(ISO8601Time1, TimestampType)) - checkTypePromotion(DateUtils.millisToDays(3601000), enforceCorrectType(ISO8601Time1, DateType)) + checkTypePromotion(DateTimeUtils.millisToDays(3601000), + enforceCorrectType(ISO8601Time1, DateType)) val ISO8601Time2 = "1970-01-01T02:00:01-01:00" - checkTypePromotion(DateUtils.fromJavaTimestamp(new Timestamp(10801000)), + checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(10801000)), enforceCorrectType(ISO8601Time2, TimestampType)) - checkTypePromotion(DateUtils.millisToDays(10801000), enforceCorrectType(ISO8601Time2, DateType)) + checkTypePromotion(DateTimeUtils.millisToDays(10801000), + enforceCorrectType(ISO8601Time2, DateType)) } test("Get compatible type") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala index 284d99d4938d1..47a7be1c6a664 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -37,7 +37,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.ScalaReflection -import org.apache.spark.sql.catalyst.util.DateUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ // Write support class for nested groups: ParquetWriter initializes GroupWriteSupport @@ -137,7 +137,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest { def makeDateRDD(): DataFrame = sqlContext.sparkContext .parallelize(0 to 1000) - .map(i => Tuple1(DateUtils.toJavaDate(i))) + .map(i => Tuple1(DateTimeUtils.toJavaDate(i))) .toDF() .select($"_1") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala index 48875773224c7..79eac930e54f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala @@ -17,13 +17,12 @@ package org.apache.spark.sql.sources -import java.sql.{Timestamp, Date} - +import java.sql.{Date, Timestamp} import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -84,8 +83,8 @@ case class AllDataTypesScan( i.toDouble, Decimal(new java.math.BigDecimal(i)), Decimal(new java.math.BigDecimal(i)), - DateUtils.fromJavaDate(new Date(1970, 1, 1)), - DateUtils.fromJavaTimestamp(new Timestamp(20000 + i)), + DateTimeUtils.fromJavaDate(new Date(1970, 1, 1)), + DateTimeUtils.fromJavaTimestamp(new Timestamp(20000 + i)), UTF8String.fromString(s"varchar_$i"), Seq(i, i + 1), Seq(Map(UTF8String.fromString(s"str_$i") -> InternalRow(i.toLong))), @@ -93,7 +92,7 @@ case class AllDataTypesScan( Map(Map(UTF8String.fromString(s"str_$i") -> i.toFloat) -> InternalRow(i.toLong)), Row(i, i.toString), Row(Seq(UTF8String.fromString(s"str_$i"), UTF8String.fromString(s"str_${i + 1}")), - InternalRow(Seq(DateUtils.fromJavaDate(new Date(1970, 1, i + 1)))))) + InternalRow(Seq(DateTimeUtils.fromJavaDate(new Date(1970, 1, i + 1)))))) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index d4f1ae8ee01d9..864c888ab073d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.hive.serde2.{io => hiveIo} import org.apache.hadoop.{io => hadoopIo} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util.DateUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -273,7 +273,7 @@ private[hive] trait HiveInspectors { System.arraycopy(writable.getBytes, 0, temp, 0, temp.length) temp case poi: WritableConstantDateObjectInspector => - DateUtils.fromJavaDate(poi.getWritableConstantValue.get()) + DateTimeUtils.fromJavaDate(poi.getWritableConstantValue.get()) case mi: StandardConstantMapObjectInspector => // take the value from the map inspector object, rather than the input data mi.getWritableConstantValue.map { case (k, v) => @@ -313,13 +313,13 @@ private[hive] trait HiveInspectors { System.arraycopy(bw.getBytes(), 0, result, 0, bw.getLength()) result case x: DateObjectInspector if x.preferWritable() => - DateUtils.fromJavaDate(x.getPrimitiveWritableObject(data).get()) - case x: DateObjectInspector => DateUtils.fromJavaDate(x.getPrimitiveJavaObject(data)) + DateTimeUtils.fromJavaDate(x.getPrimitiveWritableObject(data).get()) + case x: DateObjectInspector => DateTimeUtils.fromJavaDate(x.getPrimitiveJavaObject(data)) case x: TimestampObjectInspector if x.preferWritable() => val t = x.getPrimitiveWritableObject(data) t.getSeconds * 10000000L + t.getNanos / 100 case ti: TimestampObjectInspector => - DateUtils.fromJavaTimestamp(ti.getPrimitiveJavaObject(data)) + DateTimeUtils.fromJavaTimestamp(ti.getPrimitiveJavaObject(data)) case _ => pi.getPrimitiveJavaObject(data) } case li: ListObjectInspector => @@ -356,10 +356,10 @@ private[hive] trait HiveInspectors { (o: Any) => HiveDecimal.create(o.asInstanceOf[Decimal].toJavaBigDecimal) case _: JavaDateObjectInspector => - (o: Any) => DateUtils.toJavaDate(o.asInstanceOf[Int]) + (o: Any) => DateTimeUtils.toJavaDate(o.asInstanceOf[Int]) case _: JavaTimestampObjectInspector => - (o: Any) => DateUtils.toJavaTimestamp(o.asInstanceOf[Long]) + (o: Any) => DateTimeUtils.toJavaTimestamp(o.asInstanceOf[Long]) case soi: StandardStructObjectInspector => val wrappers = soi.getAllStructFieldRefs.map(ref => wrapperFor(ref.getFieldObjectInspector)) @@ -468,9 +468,9 @@ private[hive] trait HiveInspectors { case _: BinaryObjectInspector if x.preferWritable() => getBinaryWritable(a) case _: BinaryObjectInspector => a.asInstanceOf[Array[Byte]] case _: DateObjectInspector if x.preferWritable() => getDateWritable(a) - case _: DateObjectInspector => DateUtils.toJavaDate(a.asInstanceOf[Int]) + case _: DateObjectInspector => DateTimeUtils.toJavaDate(a.asInstanceOf[Int]) case _: TimestampObjectInspector if x.preferWritable() => getTimestampWritable(a) - case _: TimestampObjectInspector => DateUtils.toJavaTimestamp(a.asInstanceOf[Long]) + case _: TimestampObjectInspector => DateTimeUtils.toJavaTimestamp(a.asInstanceOf[Long]) } case x: SettableStructObjectInspector => val fieldRefs = x.getAllStructFieldRefs @@ -781,7 +781,7 @@ private[hive] trait HiveInspectors { if (value == null) { null } else { - new hiveIo.TimestampWritable(DateUtils.toJavaTimestamp(value.asInstanceOf[Long])) + new hiveIo.TimestampWritable(DateTimeUtils.toJavaTimestamp(value.asInstanceOf[Long])) } private def getDecimalWritable(value: Any): hiveIo.HiveDecimalWritable = diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 439f39bafc926..00e61e35d4354 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -29,11 +29,11 @@ import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf} -import org.apache.spark.{Logging} +import org.apache.spark.Logging import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util.DateUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.util.{SerializableConfiguration, Utils} /** @@ -362,10 +362,10 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging { row.update(ordinal, HiveShim.toCatalystDecimal(oi, value)) case oi: TimestampObjectInspector => (value: Any, row: MutableRow, ordinal: Int) => - row.setLong(ordinal, DateUtils.fromJavaTimestamp(oi.getPrimitiveJavaObject(value))) + row.setLong(ordinal, DateTimeUtils.fromJavaTimestamp(oi.getPrimitiveJavaObject(value))) case oi: DateObjectInspector => (value: Any, row: MutableRow, ordinal: Int) => - row.setInt(ordinal, DateUtils.fromJavaDate(oi.getPrimitiveJavaObject(value))) + row.setInt(ordinal, DateTimeUtils.fromJavaDate(oi.getPrimitiveJavaObject(value))) case oi: BinaryObjectInspector => (value: Any, row: MutableRow, ordinal: Int) => row.update(ordinal, oi.getPrimitiveJavaObject(value)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index 8b928861fcc70..ab75b12e2a2e7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.hive.common.FileUtils import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.sql.Row import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter} -import org.apache.spark.sql.catalyst.util.DateUtils +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableJobConf @@ -201,7 +201,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( def convertToHiveRawString(col: String, value: Any): String = { val raw = String.valueOf(value) schema(col).dataType match { - case DateType => DateUtils.toString(raw.toInt) + case DateType => DateTimeUtils.toString(raw.toInt) case _: DecimalType => BigDecimal(raw).toString() case _ => raw }