Skip to content

[SPARK-23715][SQL] the input of to/from_utc_timestamp can not have timezone #21169

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1805,12 +1805,13 @@ working with timestamps in `pandas_udf`s to get the best performance, see

- Since Spark 2.4, Spark maximizes the usage of a vectorized ORC reader for ORC files by default. To do that, `spark.sql.orc.impl` and `spark.sql.orc.filterPushdown` change their default values to `native` and `true` respectively.
- In PySpark, when Arrow optimization is enabled, previously `toPandas` just failed when Arrow optimization is unable to be used whereas `createDataFrame` from Pandas DataFrame allowed the fallback to non-optimization. Now, both `toPandas` and `createDataFrame` from Pandas DataFrame allow the fallback by default, which can be switched off by `spark.sql.execution.arrow.fallback.enabled`.
- Since Spark 2.4, writing an empty dataframe to a directory launches at least one write task, even if physically the dataframe has no partition. This introduces a small behavior change that for self-describing file formats like Parquet and Orc, Spark creates a metadata-only file in the target directory when writing a 0-partition dataframe, so that schema inference can still work if users read that directory later. The new behavior is more reasonable and more consistent regarding writing empty dataframe.
- Since Spark 2.4, expression IDs in UDF arguments do not appear in column names. For example, an column name in Spark 2.4 is not `UDF:f(col0 AS colA#28)` but ``UDF:f(col0 AS `colA`)``.
- Since Spark 2.4, writing a dataframe with an empty or nested empty schema using any file formats (parquet, orc, json, text, csv etc.) is not allowed. An exception is thrown when attempting to write dataframes with empty schema.
- Since Spark 2.4, Spark compares a DATE type with a TIMESTAMP type after promotes both sides to TIMESTAMP. To set `false` to `spark.sql.hive.compareDateTimestampInTimestamp` restores the previous behavior. This option will be removed in Spark 3.0.
- Since Spark 2.4, creating a managed table with nonempty location is not allowed. An exception is thrown when attempting to create a managed table with nonempty location. To set `true` to `spark.sql.allowCreatingManagedTableUsingNonemptyLocation` restores the previous behavior. This option will be removed in Spark 3.0.
- Since Spark 2.4, the type coercion rules can automatically promote the argument types of the variadic SQL functions (e.g., IN/COALESCE) to the widest common type, no matter how the input arguments order. In prior Spark versions, the promotion could fail in some specific orders (e.g., TimestampType, IntegerType and StringType) and throw an exception.
- Since Spark 2.4, writing an empty dataframe to a directory launches at least one write task, even if physically the dataframe has no partition. This introduces a small behavior change that for self-describing file formats like Parquet and Orc, Spark creates a metadata-only file in the target directory when writing a 0-partition dataframe, so that schema inference can still work if users read that directory later. The new behavior is more reasonable and more consistent regarding writing empty dataframe.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix indentation

- Since Spark 2.4, expression IDs in UDF arguments do not appear in column names. For example, an column name in Spark 2.4 is not `UDF:f(col0 AS colA#28)` but ``UDF:f(col0 AS `colA`)``.
- Since Spark 2.4, writing a dataframe with an empty or nested empty schema using any file formats (parquet, orc, json, text, csv etc.) is not allowed. An exception is thrown when attempting to write dataframes with empty schema.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's a nested empty schema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like new StructType("empty", new StructType()), the table has a column, the column is struct type but has 0 fields. This schema is invalid to write out.

Anyway this is an existing comment and I just fixed its indentation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us create a dedicated ticket and let the community improve them?

- Since Spark 2.4, Spark compares a DATE type with a TIMESTAMP type after promotes both sides to TIMESTAMP. To set `false` to `spark.sql.hive.compareDateTimestampInTimestamp` restores the previous behavior. This option will be removed in Spark 3.0.
- Since Spark 2.4, creating a managed table with nonempty location is not allowed. An exception is thrown when attempting to create a managed table with nonempty location. To set `true` to `spark.sql.allowCreatingManagedTableUsingNonemptyLocation` restores the previous behavior. This option will be removed in Spark 3.0.
- Since Spark 2.4, the type coercion rules can automatically promote the argument types of the variadic SQL functions (e.g., IN/COALESCE) to the widest common type, no matter how the input arguments order. In prior Spark versions, the promotion could fail in some specific orders (e.g., TimestampType, IntegerType and StringType) and throw an exception.
- In version 2.3 and earlier, `to_utc_timestamp` and `from_utc_timestamp` respect the timezone in the input timestamp string, which breaks the assumption that the input timestamp is in a specific timezone. Therefore, these 2 functions can return unexpected results. In version 2.4 and later, this problem has been fixed. `to_utc_timestamp` and `from_utc_timestamp` will return null if the input timestamp string contains timezone. As an example, `from_utc_timestamp('2000-10-10 00:00:00', 'GMT+1')` will return `2000-10-10 01:00:00` in both Spark 2.3 and 2.4. However, `from_utc_timestamp('2000-10-10 00:00:00+00:00', 'GMT+1')`, assuming a local timezone of GMT+8, will return `2000-10-10 09:00:00` in Spark 2.3 but `null` in 2.4. For people who don't care about this problem and want to retain the previous behaivor to keep their query unchanged, you can set `spark.sql.function.rejectTimezoneInString` to false. This option will be removed in Spark 3.0 and should only be used as a temporary workaround.
## Upgrading From Spark SQL 2.2 to 2.3

- Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named `_corrupt_record` by default). For example, `spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()` and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`. Instead, you can cache or save the parsed results and then send the same query. For example, `val df = spark.read.schema(schema).json(file).cache()` and then `df.filter($"_corrupt_record".isNotNull).count()`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object TypeCoercion {
IfCoercion ::
StackCoercion ::
Division ::
ImplicitTypeCasts ::
new ImplicitTypeCasts(conf) ::
DateTimeOperations ::
WindowFrameCoercion ::
Nil
Expand Down Expand Up @@ -776,12 +776,33 @@ object TypeCoercion {
/**
* Casts types according to the expected input types for [[Expression]]s.
*/
object ImplicitTypeCasts extends TypeCoercionRule {
class ImplicitTypeCasts(conf: SQLConf) extends TypeCoercionRule {

private def rejectTzInString = conf.getConf(SQLConf.REJECT_TIMEZONE_IN_STRING)

override protected def coerceTypes(
plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
// Skip nodes who's children have not been resolved yet.
case e if !e.childrenResolved => e

// Special rules for `from/to_utc_timestamp`. These 2 functions assume the input timestamp
// string is in a specific timezone, so the string itself should not contain timezone.
// TODO: We should move the type coercion logic to expressions instead of a central
// place to put all the rules.
case e: FromUTCTimestamp if e.left.dataType == StringType =>
if (rejectTzInString) {
e.copy(left = StringToTimestampWithoutTimezone(e.left))
} else {
e.copy(left = Cast(e.left, TimestampType))
}

case e: ToUTCTimestamp if e.left.dataType == StringType =>
if (rejectTzInString) {
e.copy(left = StringToTimestampWithoutTimezone(e.left))
} else {
e.copy(left = Cast(e.left, TimestampType))
}

case b @ BinaryOperator(left, right) if left.dataType != right.dataType =>
findTightestCommonType(left.dataType, right.dataType).map { commonType =>
if (b.inputType.acceptsType(commonType)) {
Expand All @@ -798,7 +819,7 @@ object TypeCoercion {
case e: ImplicitCastInputTypes if e.inputTypes.nonEmpty =>
val children: Seq[Expression] = e.children.zip(e.inputTypes).map { case (in, expected) =>
// If we cannot do the implicit cast, just use the original input.
implicitCast(in, expected).getOrElse(in)
ImplicitTypeCasts.implicitCast(in, expected).getOrElse(in)
}
e.withNewChildren(children)

Expand All @@ -814,6 +835,9 @@ object TypeCoercion {
}
e.withNewChildren(children)
}
}

object ImplicitTypeCasts {

/**
* Given an expected data type, try to cast the expression and return the cast expression.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,48 @@ case class TimeAdd(start: Expression, interval: Expression, timeZoneId: Option[S
}
}

/**
* A special expression used to convert the string input of `to/from_utc_timestamp` to timestamp,
* which requires the timestamp string to not have timezone information, otherwise null is returned.
*/
case class StringToTimestampWithoutTimezone(child: Expression, timeZoneId: Option[String] = None)
extends UnaryExpression with TimeZoneAwareExpression with ExpectsInputTypes {

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

override def inputTypes: Seq[AbstractDataType] = Seq(StringType)
override def dataType: DataType = TimestampType
override def nullable: Boolean = true
override def toString: String = child.toString
override def sql: String = child.sql

override def nullSafeEval(input: Any): Any = {
DateTimeUtils.stringToTimestamp(
input.asInstanceOf[UTF8String], timeZone, rejectTzInString = true).orNull
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
val tz = ctx.addReferenceObj("timeZone", timeZone)
val longOpt = ctx.freshName("longOpt")
val eval = child.genCode(ctx)
val code = s"""
|${eval.code}
|${CodeGenerator.JAVA_BOOLEAN} ${ev.isNull} = true;
|${CodeGenerator.JAVA_LONG} ${ev.value} = ${CodeGenerator.defaultValue(TimestampType)};
|if (!${eval.isNull}) {
| scala.Option<Long> $longOpt = $dtu.stringToTimestamp(${eval.value}, $tz, true);
| if ($longOpt.isDefined()) {
| ${ev.value} = ((Long) $longOpt.get()).longValue();
| ${ev.isNull} = false;
| }
|}
""".stripMargin
ev.copy(code = code)
}
}

/**
* Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a time in UTC, and renders
* that time as a timestamp in the given time zone. For example, 'GMT+1' would yield
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,28 @@ object DateTimeUtils {
* `T[h]h:[m]m:[s]s.[ms][ms][ms][us][us][us]+[h]h:[m]m`
*/
def stringToTimestamp(s: UTF8String): Option[SQLTimestamp] = {
stringToTimestamp(s, defaultTimeZone())
stringToTimestamp(s, defaultTimeZone(), rejectTzInString = false)
}

def stringToTimestamp(s: UTF8String, timeZone: TimeZone): Option[SQLTimestamp] = {
stringToTimestamp(s, timeZone, rejectTzInString = false)
}

/**
* Converts a timestamp string to microseconds from the unix epoch, w.r.t. the given timezone.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I usually avoid abbreviation in doc tho (w.r.t.).

* Returns None if the input string is not a valid timestamp format.
*
* @param s the input timestamp string.
* @param timeZone the timezone of the timestamp string, will be ignored if the timestamp string
* already contains timezone information and `forceTimezone` is false.
* @param rejectTzInString if true, rejects timezone in the input string, i.e., if the
* timestamp string contains timezone, like `2000-10-10 00:00:00+00:00`,
* return None.
*/
def stringToTimestamp(
s: UTF8String,
timeZone: TimeZone,
rejectTzInString: Boolean): Option[SQLTimestamp] = {
if (s == null) {
return None
}
Expand Down Expand Up @@ -417,6 +435,8 @@ object DateTimeUtils {
return None
}

if (tz.isDefined && rejectTzInString) return None

val c = if (tz.isEmpty) {
Calendar.getInstance(timeZone)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1208,6 +1208,13 @@ object SQLConf {
.stringConf
.createWithDefault("")

val REJECT_TIMEZONE_IN_STRING = buildConf("spark.sql.function.rejectTimezoneInString")
.internal()
.doc("If true, `to_utc_timestamp` and `from_utc_timestamp` return null if the input string " +
"contains a timezone part, e.g. `2000-10-10 00:00:00+00:00`.")
.booleanConf
.createWithDefault(true)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we need this? Currently, if a user passes '2000-10-10 00:00:00+00:00' to <to/from>_utc_timestamp, they get the wrong answer. If they switch off this setting, they will continue to get the wrong answer rather than null. Are we accommodating the users who experienced this bug and are manually shifting the result?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

existing workloads may depend on the previous behavior and think that's corrected. It's safer to provide an internal conf and tell users about it when they complain about behavior change. It's an internal conf and is invisible to end users.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

existing workloads may depend on the previous behavior and think that's corrected

Then I think we need test cases (maybe in DateFunctionsSuite, since those tests would also exercise the type coercion), where REJECT_TIMEZONE_IN_STRING is false.

object PartitionOverwriteMode extends Enumeration {
val STATIC, DYNAMIC = Value
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,23 +524,23 @@ class TypeCoercionSuite extends AnalysisTest {
test("cast NullType for expressions that implement ExpectsInputTypes") {
import TypeCoercionSuite._

ruleTest(TypeCoercion.ImplicitTypeCasts,
ruleTest(new TypeCoercion.ImplicitTypeCasts(conf),
AnyTypeUnaryExpression(Literal.create(null, NullType)),
AnyTypeUnaryExpression(Literal.create(null, NullType)))

ruleTest(TypeCoercion.ImplicitTypeCasts,
ruleTest(new TypeCoercion.ImplicitTypeCasts(conf),
NumericTypeUnaryExpression(Literal.create(null, NullType)),
NumericTypeUnaryExpression(Literal.create(null, DoubleType)))
}

test("cast NullType for binary operators") {
import TypeCoercionSuite._

ruleTest(TypeCoercion.ImplicitTypeCasts,
ruleTest(new TypeCoercion.ImplicitTypeCasts(conf),
AnyTypeBinaryOperator(Literal.create(null, NullType), Literal.create(null, NullType)),
AnyTypeBinaryOperator(Literal.create(null, NullType), Literal.create(null, NullType)))

ruleTest(TypeCoercion.ImplicitTypeCasts,
ruleTest(new TypeCoercion.ImplicitTypeCasts(conf),
NumericTypeBinaryOperator(Literal.create(null, NullType), Literal.create(null, NullType)),
NumericTypeBinaryOperator(Literal.create(null, DoubleType), Literal.create(null, DoubleType)))
}
Expand Down Expand Up @@ -823,7 +823,7 @@ class TypeCoercionSuite extends AnalysisTest {
}

test("type coercion for CaseKeyWhen") {
ruleTest(TypeCoercion.ImplicitTypeCasts,
ruleTest(new TypeCoercion.ImplicitTypeCasts(conf),
CaseKeyWhen(Literal(1.toShort), Seq(Literal(1), Literal("a"))),
CaseKeyWhen(Cast(Literal(1.toShort), IntegerType), Seq(Literal(1), Literal("a")))
)
Expand Down Expand Up @@ -1275,7 +1275,7 @@ class TypeCoercionSuite extends AnalysisTest {
}

test("SPARK-17117 null type coercion in divide") {
val rules = Seq(FunctionArgumentConversion, Division, ImplicitTypeCasts)
val rules = Seq(FunctionArgumentConversion, Division, new ImplicitTypeCasts(conf))
val nullLit = Literal.create(null, NullType)
ruleTest(rules, Divide(1L, nullLit), Divide(Cast(1L, DoubleType), Cast(nullLit, DoubleType)))
ruleTest(rules, Divide(nullLit, 1L), Divide(Cast(nullLit, DoubleType), Cast(1L, DoubleType)))
Expand Down
33 changes: 33 additions & 0 deletions sql/core/src/test/resources/sql-tests/inputs/datetime.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,36 @@ select current_date = current_date(), current_timestamp = current_timestamp(), a
select a, b from ttf2 order by a, current_date;

select weekday('2007-02-03'), weekday('2009-07-30'), weekday('2017-05-27'), weekday(null), weekday('1582-10-15 13:10:15');

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it matter if there are no success cases for from_utc_timestamp and to_utc_timestamp in here (that is, cases that don't return null)?

select from_utc_timestamp('2015-07-24 00:00:00', 'PST');

select from_utc_timestamp('2015-01-24 00:00:00', 'PST');

select from_utc_timestamp(null, 'PST');

select from_utc_timestamp('2015-07-24 00:00:00', null);

select from_utc_timestamp(null, null);

select from_utc_timestamp(cast(0 as timestamp), 'PST');

select from_utc_timestamp(cast('2015-01-24' as date), 'PST');

select to_utc_timestamp('2015-07-24 00:00:00', 'PST');

select to_utc_timestamp('2015-01-24 00:00:00', 'PST');

select to_utc_timestamp(null, 'PST');

select to_utc_timestamp('2015-07-24 00:00:00', null);

select to_utc_timestamp(null, null);

select to_utc_timestamp(cast(0 as timestamp), 'PST');

select to_utc_timestamp(cast('2015-01-24' as date), 'PST');

-- SPARK-23715: the input of to/from_utc_timestamp can not have timezone
select from_utc_timestamp('2000-10-10 00:00:00+00:00', 'PST');

select to_utc_timestamp('2000-10-10 00:00:00+00:00', 'PST');
Loading