Skip to content

[SPARK-31710][SQL][FOLLOWUP] Replace CAST by TIMESTAMP_SECONDS in benchmarks #28843

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object DateTimeBenchmark extends SqlBasedBenchmark {

private def run(cardinality: Int, func: String): Unit = {
codegenBenchmark(s"$func of timestamp", cardinality) {
doBenchmark(cardinality, s"$func(cast(id as timestamp))")
doBenchmark(cardinality, s"$func(timestamp_seconds(id))")
}
}

Expand All @@ -64,7 +64,7 @@ object DateTimeBenchmark extends SqlBasedBenchmark {
val N = 10000000
runBenchmark("datetime +/- interval") {
val benchmark = new Benchmark("datetime +/- interval", N, output = output)
val ts = "cast(id as timestamp)"
val ts = "timestamp_seconds(id)"
val dt = s"cast($ts as date)"
benchmark.addCase("date + interval(m)") { _ =>
doBenchmark(N, s"$dt + interval 1 month")
Expand Down Expand Up @@ -105,7 +105,7 @@ object DateTimeBenchmark extends SqlBasedBenchmark {
benchmark.run()
}
runBenchmark("Extract components") {
run(N, "cast to timestamp", "cast(id as timestamp)")
run(N, "cast to timestamp", "timestamp_seconds(id)")
run(N, "year")
run(N, "quarter")
run(N, "month")
Expand All @@ -124,7 +124,7 @@ object DateTimeBenchmark extends SqlBasedBenchmark {
run(N, "current_timestamp", "current_timestamp")
}
runBenchmark("Date arithmetic") {
val dateExpr = "cast(cast(id as timestamp) as date)"
val dateExpr = "cast(timestamp_seconds(id) as date)"
run(N, "cast to date", dateExpr)
run(N, "last_day", s"last_day($dateExpr)")
run(N, "next_day", s"next_day($dateExpr, 'TU')")
Expand All @@ -133,31 +133,31 @@ object DateTimeBenchmark extends SqlBasedBenchmark {
run(N, "add_months", s"add_months($dateExpr, 10)")
}
runBenchmark("Formatting dates") {
val dateExpr = "cast(cast(id as timestamp) as date)"
val dateExpr = "cast(timestamp_seconds(id) as date)"
run(N, "format date", s"date_format($dateExpr, 'MMM yyyy')")
}
runBenchmark("Formatting timestamps") {
run(N, "from_unixtime", "from_unixtime(id, 'yyyy-MM-dd HH:mm:ss.SSSSSS')")
}
runBenchmark("Convert timestamps") {
val timestampExpr = "cast(id as timestamp)"
val timestampExpr = "timestamp_seconds(id)"
run(N, "from_utc_timestamp", s"from_utc_timestamp($timestampExpr, 'CET')")
run(N, "to_utc_timestamp", s"to_utc_timestamp($timestampExpr, 'CET')")
}
runBenchmark("Intervals") {
val (start, end) = ("cast(id as timestamp)", "cast((id+8640000) as timestamp)")
val (start, end) = ("timestamp_seconds(id)", "timestamp_seconds(id+8640000)")
run(N, "cast interval", start, end)
run(N, "datediff", s"datediff($start, $end)")
run(N, "months_between", s"months_between($start, $end)")
run(1000000, "window", s"window($start, 100, 10, 1)")
}
runBenchmark("Truncation") {
val timestampExpr = "cast(id as timestamp)"
val timestampExpr = "timestamp_seconds(id)"
Seq("YEAR", "YYYY", "YY", "MON", "MONTH", "MM", "DAY", "DD", "HOUR", "MINUTE",
"SECOND", "WEEK", "QUARTER").foreach { level =>
run(N, s"date_trunc $level", s"date_trunc('$level', $timestampExpr)")
}
val dateExpr = "cast(cast(id as timestamp) as date)"
val dateExpr = "cast(timestamp_seconds(id) as date)"
Seq("year", "yyyy", "yy", "mon", "month", "mm").foreach { level =>
run(N, s"trunc $level", s"trunc('$level', $dateExpr)")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ object ExtractBenchmark extends SqlBasedBenchmark {
}

private def castExpr(from: String): String = from match {
case "timestamp" => "cast(id as timestamp)"
case "date" => "cast(cast(id as timestamp) as date)"
case "interval" => "(cast(cast(id as timestamp) as date) - date'0001-01-01') + " +
"(cast(id as timestamp) - timestamp'1000-01-01 01:02:03.123456')"
case "timestamp" => "timestamp_seconds(id)"
case "date" => "cast(timestamp_seconds(id) as date)"
case "interval" => "(cast(timestamp_seconds(id) as date) - date'0001-01-01') + " +
"(timestamp_seconds(id) - timestamp'1000-01-01 01:02:03.123456')"
case other => throw new IllegalArgumentException(
s"Unsupported column type $other. Valid column types are 'timestamp' and 'date'")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.util.Random
import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.spark.sql.functions.{monotonically_increasing_id, timestamp_seconds}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType
import org.apache.spark.sql.types.{ByteType, Decimal, DecimalType, TimestampType}
Expand Down Expand Up @@ -332,11 +332,11 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark {
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> fileType) {
val columns = (1 to width).map(i => s"CAST(id AS string) c$i")
val df = spark.range(numRows).selectExpr(columns: _*)
.withColumn("value", monotonically_increasing_id().cast(TimestampType))
.withColumn("value", timestamp_seconds(monotonically_increasing_id()))
withTempTable("orcTable", "parquetTable") {
saveAsTable(df, dir)

Seq(s"value = CAST($mid AS timestamp)").foreach { whereExpr =>
Seq(s"value = timestamp_seconds($mid)").foreach { whereExpr =>
val title = s"Select 1 timestamp stored as $fileType row ($whereExpr)"
.replace("value AND value", "value")
filterPushDownBenchmark(numRows, title, whereExpr)
Expand All @@ -348,8 +348,8 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark {
filterPushDownBenchmark(
numRows,
s"Select $percent% timestamp stored as $fileType rows " +
s"(value < CAST(${numRows * percent / 100} AS timestamp))",
s"value < CAST(${numRows * percent / 100} as timestamp)",
s"(value < timestamp_seconds(${numRows * percent / 100}))",
s"value < timestamp_seconds(${numRows * percent / 100})",
selectExpr
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.benchmark

import org.apache.spark.benchmark.Benchmark
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{array, struct}
import org.apache.spark.sql.functions.{array, struct, timestamp_seconds}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -128,15 +128,15 @@ object InExpressionBenchmark extends SqlBasedBenchmark {

private def runTimestampBenchmark(numItems: Int, numRows: Long, minNumIters: Int): Unit = {
val name = s"$numItems timestamps"
val values = (1 to numItems).map(m => s"CAST('1970-01-01 01:00:00.$m' AS timestamp)")
val df = spark.range(0, numRows).select($"id".cast(TimestampType))
val values = (1 to numItems).map(m => s"timestamp'1970-01-01 01:00:00.$m'")
val df = spark.range(0, numRows).select(timestamp_seconds($"id").as("id"))
runBenchmark(name, df, values, numRows, minNumIters)
}

private def runDateBenchmark(numItems: Int, numRows: Long, minNumIters: Int): Unit = {
val name = s"$numItems dates"
val values = (1 to numItems).map(n => 1970 + n).map(y => s"CAST('$y-01-01' AS date)")
val df = spark.range(0, numRows).select($"id".cast(TimestampType).cast(DateType))
val values = (1 to numItems).map(n => 1970 + n).map(y => s"date'$y-01-01'")
val df = spark.range(0, numRows).select(timestamp_seconds($"id").cast(DateType).as("id"))
runBenchmark(name, df, values, numRows, minNumIters)
}

Expand Down