Skip to content

Commit a39067d

Browse files
committed
fix-code-style
2 parents e215f1c + 4577fa8 commit a39067d

File tree

3 files changed

+159
-89
lines changed

3 files changed

+159
-89
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ object Cast {
6161
case (DateType, TimestampType) => true
6262
case (_: NumericType, TimestampType) => if (SQLConf.get.numericConvertToTimestampEnable) true
6363
else false
64-
6564
case (StringType, DateType) => true
6665
case (TimestampType, DateType) => true
6766

@@ -464,7 +463,6 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit
464463
if (SQLConf.get.numericConvertToTimestampInSeconds) t * MICROS_PER_SECOND
465464
else t * MILLIS_PER_SECOND
466465
}
467-
468466
// converting us to seconds
469467
private[this] def timestampToLong(ts: Long): Long = {
470468
Math.floorDiv(ts, MICROS_PER_SECOND)
@@ -1287,13 +1285,11 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit
12871285
val block = inline"new java.math.BigDecimal($MICROS_PER_SECOND)"
12881286
code"($d.toBigDecimal().bigDecimal().multiply($block)).longValue()"
12891287
}
1290-
12911288
private[this] def longToTimeStampCode(l: ExprValue): Block = {
12921289
if (SQLConf.get.numericConvertToTimestampInSeconds) code"" +
12931290
code"$l * $MICROS_PER_SECOND"
12941291
else code"$l * $MILLIS_PER_SECOND"
12951292
}
1296-
12971293
private[this] def timestampToLongCode(ts: ExprValue): Block =
12981294
code"java.lang.Math.floorDiv($ts, $MICROS_PER_SECOND)"
12991295
private[this] def timestampToDoubleCode(ts: ExprValue): Block =

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 68 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -621,7 +621,7 @@ object SQLConf {
621621
.stringConf
622622
.transform(_.toUpperCase(Locale.ROOT))
623623
.checkValues(ParquetOutputTimestampType.values.map(_.toString))
624-
.createWithDefault(ParquetOutputTimestampType.INT96.toString)
624+
.createWithDefault(ParquetOutputTimestampType.TIMESTAMP_MICROS.toString)
625625

626626
val PARQUET_COMPRESSION = buildConf("spark.sql.parquet.compression.codec")
627627
.doc("Sets the compression codec used when writing Parquet files. If either `compression` or " +
@@ -845,10 +845,8 @@ object SQLConf {
845845
.doc("When true, enable the metadata-only query optimization that use the table's metadata " +
846846
"to produce the partition columns instead of table scans. It applies when all the columns " +
847847
"scanned are partition columns and the query has an aggregate operator that satisfies " +
848-
"distinct semantics. By default the optimization is disabled, and deprecated as of Spark " +
849-
"3.0 since it may return incorrect results when the files are empty, see also SPARK-26709." +
850-
"It will be removed in the future releases. If you must use, use 'SparkSessionExtensions' " +
851-
"instead to inject it as a custom rule.")
848+
"distinct semantics. By default the optimization is disabled, since it may return " +
849+
"incorrect results when the files are empty.")
852850
.version("2.1.1")
853851
.booleanConf
854852
.createWithDefault(false)
@@ -2065,18 +2063,16 @@ object SQLConf {
20652063
.booleanConf
20662064
.createWithDefault(true)
20672065

2068-
val NESTED_PREDICATE_PUSHDOWN_FILE_SOURCE_LIST =
2069-
buildConf("spark.sql.optimizer.nestedPredicatePushdown.supportedFileSources")
2066+
val NESTED_PREDICATE_PUSHDOWN_ENABLED =
2067+
buildConf("spark.sql.optimizer.nestedPredicatePushdown.enabled")
20702068
.internal()
2071-
.doc("A comma-separated list of data source short names or fully qualified data source " +
2072-
"implementation class names for which Spark tries to push down predicates for nested " +
2073-
"columns and/or names containing `dots` to data sources. This configuration is only " +
2074-
"effective with file-based data source in DSv1. Currently, Parquet implements " +
2075-
"both optimizations while ORC only supports predicates for names containing `dots`. The " +
2076-
"other data sources don't support this feature yet. So the default value is 'parquet,orc'.")
2069+
.doc("When true, Spark tries to push down predicates for nested columns and or names " +
2070+
"containing `dots` to data sources. Currently, Parquet implements both optimizations " +
2071+
"while ORC only supports predicates for names containing `dots`. The other data sources" +
2072+
"don't support this feature yet.")
20772073
.version("3.0.0")
2078-
.stringConf
2079-
.createWithDefault("parquet,orc")
2074+
.booleanConf
2075+
.createWithDefault(true)
20802076

20812077
val SERIALIZER_NESTED_SCHEMA_PRUNING_ENABLED =
20822078
buildConf("spark.sql.optimizer.serializer.nestedSchemaPruning.enabled")
@@ -2228,6 +2224,15 @@ object SQLConf {
22282224
.booleanConf
22292225
.createWithDefault(false)
22302226

2227+
val LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED =
2228+
buildConf("spark.sql.legacy.createHiveTableByDefault.enabled")
2229+
.internal()
2230+
.doc("When set to true, CREATE TABLE syntax without a provider will use hive " +
2231+
s"instead of the value of ${DEFAULT_DATA_SOURCE_NAME.key}.")
2232+
.version("3.0.0")
2233+
.booleanConf
2234+
.createWithDefault(false)
2235+
22312236
val LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING =
22322237
buildConf("spark.sql.legacy.bucketedTableScan.outputOrdering")
22332238
.internal()
@@ -2519,72 +2524,57 @@ object SQLConf {
25192524
.booleanConf
25202525
.createWithDefault(false)
25212526

2522-
val LEGACY_PARQUET_REBASE_MODE_IN_WRITE =
2523-
buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInWrite")
2527+
val LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE =
2528+
buildConf("spark.sql.legacy.parquet.rebaseDateTimeInWrite.enabled")
25242529
.internal()
2525-
.doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic Gregorian calendar " +
2526-
"to the legacy hybrid (Julian + Gregorian) calendar when writing Parquet files. " +
2527-
"When CORRECTED, Spark will not do rebase and write the dates/timestamps as it is. " +
2528-
"When EXCEPTION, which is the default, Spark will fail the writing if it sees " +
2529-
"ancient dates/timestamps that are ambiguous between the two calendars.")
2530+
.doc("When true, rebase dates/timestamps from Proleptic Gregorian calendar " +
2531+
"to the hybrid calendar (Julian + Gregorian) in write. " +
2532+
"The rebasing is performed by converting micros/millis/days to " +
2533+
"a local date/timestamp in the source calendar, interpreting the resulted date/" +
2534+
"timestamp in the target calendar, and getting the number of micros/millis/days " +
2535+
"since the epoch 1970-01-01 00:00:00Z.")
25302536
.version("3.0.0")
2531-
.stringConf
2532-
.transform(_.toUpperCase(Locale.ROOT))
2533-
.checkValues(LegacyBehaviorPolicy.values.map(_.toString))
2534-
.createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
2535-
2536-
val LEGACY_PARQUET_REBASE_MODE_IN_READ =
2537-
buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInRead")
2538-
.internal()
2539-
.doc("When LEGACY, Spark will rebase dates/timestamps from the legacy hybrid (Julian + " +
2540-
"Gregorian) calendar to Proleptic Gregorian calendar when reading Parquet files. " +
2541-
"When CORRECTED, Spark will not do rebase and read the dates/timestamps as it is. " +
2542-
"When EXCEPTION, which is the default, Spark will fail the reading if it sees " +
2543-
"ancient dates/timestamps that are ambiguous between the two calendars. This config is " +
2544-
"only effective if the writer info (like Spark, Hive) of the Parquet files is unknown.")
2545-
.version("3.0.0")
2546-
.stringConf
2547-
.transform(_.toUpperCase(Locale.ROOT))
2548-
.checkValues(LegacyBehaviorPolicy.values.map(_.toString))
2549-
.createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
2537+
.booleanConf
2538+
.createWithDefault(false)
25502539

2551-
val LEGACY_AVRO_REBASE_MODE_IN_WRITE =
2552-
buildConf("spark.sql.legacy.avro.datetimeRebaseModeInWrite")
2540+
val LEGACY_PARQUET_REBASE_DATETIME_IN_READ =
2541+
buildConf("spark.sql.legacy.parquet.rebaseDateTimeInRead.enabled")
25532542
.internal()
2554-
.doc("When LEGACY, Spark will rebase dates/timestamps from Proleptic Gregorian calendar " +
2555-
"to the legacy hybrid (Julian + Gregorian) calendar when writing Avro files. " +
2556-
"When CORRECTED, Spark will not do rebase and write the dates/timestamps as it is. " +
2557-
"When EXCEPTION, which is the default, Spark will fail the writing if it sees " +
2558-
"ancient dates/timestamps that are ambiguous between the two calendars.")
2543+
.doc("When true, rebase dates/timestamps " +
2544+
"from the hybrid calendar to Proleptic Gregorian calendar in read. " +
2545+
"The rebasing is performed by converting micros/millis/days to " +
2546+
"a local date/timestamp in the source calendar, interpreting the resulted date/" +
2547+
"timestamp in the target calendar, and getting the number of micros/millis/days " +
2548+
"since the epoch 1970-01-01 00:00:00Z.")
25592549
.version("3.0.0")
2560-
.stringConf
2561-
.transform(_.toUpperCase(Locale.ROOT))
2562-
.checkValues(LegacyBehaviorPolicy.values.map(_.toString))
2563-
.createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
2564-
2565-
val LEGACY_AVRO_REBASE_MODE_IN_READ =
2566-
buildConf("spark.sql.legacy.avro.datetimeRebaseModeInRead")
2567-
.internal()
2568-
.doc("When LEGACY, Spark will rebase dates/timestamps from the legacy hybrid (Julian + " +
2569-
"Gregorian) calendar to Proleptic Gregorian calendar when reading Avro files. " +
2570-
"When CORRECTED, Spark will not do rebase and read the dates/timestamps as it is. " +
2571-
"When EXCEPTION, which is the default, Spark will fail the reading if it sees " +
2572-
"ancient dates/timestamps that are ambiguous between the two calendars. This config is " +
2573-
"only effective if the writer info (like Spark, Hive) of the Avro files is unknown.")
2550+
.booleanConf
2551+
.createWithDefault(false)
2552+
2553+
val LEGACY_AVRO_REBASE_DATETIME_IN_WRITE =
2554+
buildConf("spark.sql.legacy.avro.rebaseDateTimeInWrite.enabled")
2555+
.internal()
2556+
.doc("When true, rebase dates/timestamps from Proleptic Gregorian calendar " +
2557+
"to the hybrid calendar (Julian + Gregorian) in write. " +
2558+
"The rebasing is performed by converting micros/millis/days to " +
2559+
"a local date/timestamp in the source calendar, interpreting the resulted date/" +
2560+
"timestamp in the target calendar, and getting the number of micros/millis/days " +
2561+
"since the epoch 1970-01-01 00:00:00Z.")
25742562
.version("3.0.0")
2575-
.stringConf
2576-
.transform(_.toUpperCase(Locale.ROOT))
2577-
.checkValues(LegacyBehaviorPolicy.values.map(_.toString))
2578-
.createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)
2563+
.booleanConf
2564+
.createWithDefault(false)
25792565

2580-
val SCRIPT_TRANSFORMATION_EXIT_TIMEOUT =
2581-
buildConf("spark.sql.scriptTransformation.exitTimeoutInSeconds")
2566+
val LEGACY_AVRO_REBASE_DATETIME_IN_READ =
2567+
buildConf("spark.sql.legacy.avro.rebaseDateTimeInRead.enabled")
25822568
.internal()
2583-
.doc("Timeout for executor to wait for the termination of transformation script when EOF.")
2569+
.doc("When true, rebase dates/timestamps " +
2570+
"from the hybrid calendar to Proleptic Gregorian calendar in read. " +
2571+
"The rebasing is performed by converting micros/millis/days to " +
2572+
"a local date/timestamp in the source calendar, interpreting the resulted date/" +
2573+
"timestamp in the target calendar, and getting the number of micros/millis/days " +
2574+
"since the epoch 1970-01-01 00:00:00Z.")
25842575
.version("3.0.0")
2585-
.timeConf(TimeUnit.SECONDS)
2586-
.checkValue(_ > 0, "The timeout value must be positive")
2587-
.createWithDefault(10L)
2576+
.booleanConf
2577+
.createWithDefault(false)
25882578

25892579
val LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_ENABLE =
25902580
buildConf("spark.sql.legacy.numericConvertToTimestampEnable")
@@ -2632,10 +2622,7 @@ object SQLConf {
26322622
DeprecatedConfig(ARROW_FALLBACK_ENABLED.key, "3.0",
26332623
s"Use '${ARROW_PYSPARK_FALLBACK_ENABLED.key}' instead of it."),
26342624
DeprecatedConfig(SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key, "3.0",
2635-
s"Use '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}' instead of it."),
2636-
DeprecatedConfig(OPTIMIZER_METADATA_ONLY.key, "3.0",
2637-
"Avoid to depend on this optimization to prevent a potential correctness issue. " +
2638-
"If you must use, use 'SparkSessionExtensions' instead to inject it as a custom rule.")
2625+
s"Use '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}' instead of it.")
26392626
)
26402627

26412628
Map(configs.map { cfg => cfg.key -> cfg } : _*)
@@ -3128,6 +3115,8 @@ class SQLConf extends Serializable with Logging {
31283115

31293116
def nestedSchemaPruningEnabled: Boolean = getConf(NESTED_SCHEMA_PRUNING_ENABLED)
31303117

3118+
def nestedPredicatePushdownEnabled: Boolean = getConf(NESTED_PREDICATE_PUSHDOWN_ENABLED)
3119+
31313120
def serializerNestedSchemaPruningEnabled: Boolean =
31323121
getConf(SERIALIZER_NESTED_SCHEMA_PRUNING_ENABLED)
31333122

@@ -3161,6 +3150,9 @@ class SQLConf extends Serializable with Logging {
31613150
def allowNegativeScaleOfDecimalEnabled: Boolean =
31623151
getConf(SQLConf.LEGACY_ALLOW_NEGATIVE_SCALE_OF_DECIMAL_ENABLED)
31633152

3153+
def createHiveTableByDefaultEnabled: Boolean =
3154+
getConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED)
3155+
31643156
def truncateTableIgnorePermissionAcl: Boolean =
31653157
getConf(SQLConf.TRUNCATE_TABLE_IGNORE_PERMISSION_ACL)
31663158

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala

Lines changed: 91 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1300,15 +1300,97 @@ class CastSuite extends CastSuiteBase {
13001300
}
13011301
}
13021302

1303-
test("cast a timestamp before the epoch 1970-01-01 00:00:00Z") {
1304-
withDefaultTimeZone(UTC) {
1305-
val negativeTs = Timestamp.valueOf("1900-05-05 18:34:56.1")
1306-
assert(negativeTs.getTime < 0)
1307-
val expectedSecs = Math.floorDiv(negativeTs.getTime, MILLIS_PER_SECOND)
1308-
checkEvaluation(cast(negativeTs, ByteType), expectedSecs.toByte)
1309-
checkEvaluation(cast(negativeTs, ShortType), expectedSecs.toShort)
1310-
checkEvaluation(cast(negativeTs, IntegerType), expectedSecs.toInt)
1311-
checkEvaluation(cast(negativeTs, LongType), expectedSecs)
1303+
test("SPARK-31710:Add legacy when casting long to timestamp") {
1304+
withSQLConf(
1305+
SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_ENABLE.key -> "true",
1306+
SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_IN_SECONDS.key -> "false") {
1307+
def checkLongToTimestamp(l: Long, expected: Long): Unit = {
1308+
checkEvaluation(cast(l, TimestampType), expected)
1309+
}
1310+
checkLongToTimestamp(253402272000L, 253402272000000L)
1311+
checkLongToTimestamp(-5L, -5000L)
1312+
checkLongToTimestamp(1L, 1000L)
1313+
checkLongToTimestamp(0L, 0L)
1314+
checkLongToTimestamp(123L, 123000L)
1315+
}
1316+
withSQLConf(
1317+
SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_ENABLE.key -> "true",
1318+
SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_IN_SECONDS.key -> "true") {
1319+
def checkLongToTimestamp(l: Long, expected: Long): Unit = {
1320+
checkEvaluation(cast(l, TimestampType), expected)
1321+
}
1322+
checkLongToTimestamp(253402272000L, 253402272000000000L)
1323+
checkLongToTimestamp(-5L, -5000000L)
1324+
checkLongToTimestamp(1L, 1000000L)
1325+
checkLongToTimestamp(0L, 0L)
1326+
checkLongToTimestamp(123L, 123000000L)
1327+
}
1328+
1329+
withSQLConf(
1330+
SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_ENABLE.key -> "false",
1331+
SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_IN_SECONDS.key -> "false") {
1332+
def checkByteToTimestamp(b: Byte, expected: Long): Unit = {
1333+
assert(!cast(b, TimestampType).resolved)
1334+
}
1335+
def checkShortToTimestamp(s: Short, expected: Long): Unit = {
1336+
assert(!cast(s, TimestampType).resolved)
1337+
}
1338+
def checkIntToTimestamp(str: Int, expected: Long): Unit = {
1339+
assert(!cast(str, TimestampType).resolved)
1340+
}
1341+
def checkLongToTimestamp(l: Long, expected: Long): Unit = {
1342+
assert(!cast(l, TimestampType).resolved)
1343+
}
1344+
def checkDecimalToTimestamp(d: Decimal, expected: Long): Unit = {
1345+
assert(!cast(d, TimestampType).resolved)
1346+
}
1347+
def checkFloatToTimestamp(f: Float, expected: Long): Unit = {
1348+
assert(!cast(f, TimestampType).resolved)
1349+
}
1350+
def checkDoubleToTimestamp(d: Double, expected: Long): Unit = {
1351+
assert(!cast(d, TimestampType).resolved)
1352+
}
1353+
checkByteToTimestamp(1.toByte, 0L)
1354+
checkShortToTimestamp(1.toShort, 0L)
1355+
checkIntToTimestamp(1, 0L)
1356+
checkLongToTimestamp(1L, 0L)
1357+
checkDecimalToTimestamp(Decimal(1.5), 0L)
1358+
checkFloatToTimestamp(1.5f, 0L)
1359+
checkDoubleToTimestamp(2.1D, 0L)
1360+
}
1361+
1362+
withSQLConf(
1363+
SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_ENABLE.key -> "false",
1364+
SQLConf.LEGACY_NUMERIC_CONVERT_TO_TIMESTAMP_IN_SECONDS.key -> "true") {
1365+
def checkByteToTimestamp(b: Byte, expected: Long): Unit = {
1366+
assert(!cast(b, TimestampType).resolved)
1367+
}
1368+
def checkShortToTimestamp(s: Short, expected: Long): Unit = {
1369+
assert(!cast(s, TimestampType).resolved)
1370+
}
1371+
def checkIntToTimestamp(str: Int, expected: Long): Unit = {
1372+
assert(!cast(str, TimestampType).resolved)
1373+
}
1374+
def checkLongToTimestamp(l: Long, expected: Long): Unit = {
1375+
assert(!cast(l, TimestampType).resolved)
1376+
}
1377+
def checkDecimalToTimestamp(d: Decimal, expected: Long): Unit = {
1378+
assert(!cast(d, TimestampType).resolved)
1379+
}
1380+
def checkFloatToTimestamp(f: Float, expected: Long): Unit = {
1381+
assert(!cast(f, TimestampType).resolved)
1382+
}
1383+
def checkDoubleToTimestamp(d: Double, expected: Long): Unit = {
1384+
assert(!cast(d, TimestampType).resolved)
1385+
}
1386+
1387+
checkByteToTimestamp(1.toByte, 0L)
1388+
checkShortToTimestamp(1.toShort, 0L)
1389+
checkIntToTimestamp(1, 0L)
1390+
checkLongToTimestamp(1L, 0L)
1391+
checkDecimalToTimestamp(Decimal(1.5), 0L)
1392+
checkFloatToTimestamp(1.5f, 0L)
1393+
checkDoubleToTimestamp(2.1D, 0L)
13121394
}
13131395
}
13141396

0 commit comments

Comments
 (0)