Skip to content

Commit bd907d0

Browse files
heyihongcloud-fan
andcommitted
[SPARK-53524][CONNECT][SQL][4.0] Fix temporal value conversion in LiteralValueProtoConverter
### What changes were proposed in this pull request? This PR fixes temporal value conversion issues in the `LiteralValueProtoConverter` for Spark Connect. The main changes include: 1. **Fixed temporal value conversion in `getConverter` method**: Updated the conversion logic for temporal data types (DATE, TIMESTAMP, TIMESTAMP_NTZ, DAY_TIME_INTERVAL, YEAR_MONTH_INTERVAL, TIME) to use proper utility methods from `SparkDateTimeUtils` and `SparkIntervalUtils` instead of directly returning raw protobuf values. 2. **Added comprehensive test coverage**: Extended the `PlanGenerationTestSuite` with a new test case that includes a tuple containing all temporal types to ensure proper conversion and serialization. 3. **Updated test expectations**: Modified the expected explain output and query test files to reflect the corrected temporal value handling. ### Why are the changes needed? The struct type in typedlit doesn't work well with temporal values due to bugs in type conversions. For example, the code below fails: ```scala import org.apache.spark.sql.functions.typedlit spark.sql("select 1").select(typedlit((1, java.time.LocalDate.of(2020, 10, 10)))).collect() """ org.apache.spark.SparkIllegalArgumentException: The value (18545) of the type (java.lang.Integer) cannot be converted to the DATE type. org.apache.spark.sql.catalyst.CatalystTypeConverters$DateConverter$.toCatalystImpl(CatalystTypeConverters.scala:356) org.apache.spark.sql.catalyst.CatalystTypeConverters$DateConverter$.toCatalystImpl(CatalystTypeConverters.scala:347) org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:110) org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:271) org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251) org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:110) org.apache.spark.sql.catalyst.CatalystTypeConverters$.$anonfun$createToCatalystConverter$2(CatalystTypeConverters.scala:532) org.apache.spark.sql.connect.planner.LiteralExpressionProtoConverter$.toCatalystExpression(LiteralExpressionProtoConverter.scala:116) """ ``` ### Does this PR introduce _any_ user-facing change? **Yes.** This PR fixes temporal value conversion in LiteralValueProtoConverter, allowing the struct type in typedlit to work with temporal values. ### How was this patch tested? `build/sbt "connect-client-jvm/testOnly org.apache.spark.sql.PlanGenerationTestSuite"` `build/sbt "connect/testOnly org.apache.spark.sql.connect.ProtoToParsedPlanTestSuite"` ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Cursor 1.5.11 Closes apache#52324 from heyihong/SPARK-53524-4.0. Lead-authored-by: Yihong He <heyihong.cn@gmail.com> Co-authored-by: Wenchen Fan <cloud0fan@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 13316c2 commit bd907d0

File tree

5 files changed

+128
-10
lines changed

5 files changed

+128
-10
lines changed

sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3390,6 +3390,16 @@ class PlanGenerationTestSuite
33903390
fn.typedLit(java.time.Duration.ofSeconds(200L)),
33913391
fn.typedLit(java.time.Period.ofDays(100)),
33923392
fn.typedLit(new CalendarInterval(2, 20, 100L)),
3393+
fn.typedLit(
3394+
(
3395+
java.time.LocalDate.of(2020, 10, 10),
3396+
java.time.Instant.ofEpochMilli(1677155519808L),
3397+
new java.sql.Timestamp(12345L),
3398+
java.time.LocalDateTime.of(2023, 2, 23, 20, 36),
3399+
java.sql.Date.valueOf("2023-02-23"),
3400+
java.time.Duration.ofSeconds(200L),
3401+
java.time.Period.ofDays(100),
3402+
new CalendarInterval(2, 20, 100L))),
33933403

33943404
// Handle parameterized scala types e.g.: List, Seq and Map.
33953405
fn.typedLit(Some(1)),

sql/connect/common/src/main/scala/org/apache/spark/sql/connect/common/LiteralValueProtoConverter.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ object LiteralValueProtoConverter {
296296
}
297297
}
298298

299-
private def getConverter(dataType: proto.DataType): proto.Expression.Literal => Any = {
299+
private def getScalaConverter(dataType: proto.DataType): proto.Expression.Literal => Any = {
300300
if (dataType.hasShort) { v =>
301301
v.getShort.toShort
302302
} else if (dataType.hasInteger) { v =>
@@ -316,15 +316,15 @@ object LiteralValueProtoConverter {
316316
} else if (dataType.hasBinary) { v =>
317317
v.getBinary.toByteArray
318318
} else if (dataType.hasDate) { v =>
319-
v.getDate
319+
SparkDateTimeUtils.toJavaDate(v.getDate)
320320
} else if (dataType.hasTimestamp) { v =>
321-
v.getTimestamp
321+
SparkDateTimeUtils.toJavaTimestamp(v.getTimestamp)
322322
} else if (dataType.hasTimestampNtz) { v =>
323-
v.getTimestampNtz
323+
SparkDateTimeUtils.microsToLocalDateTime(v.getTimestampNtz)
324324
} else if (dataType.hasDayTimeInterval) { v =>
325-
v.getDayTimeInterval
325+
SparkIntervalUtils.microsToDuration(v.getDayTimeInterval)
326326
} else if (dataType.hasYearMonthInterval) { v =>
327-
v.getYearMonthInterval
327+
SparkIntervalUtils.monthsToPeriod(v.getYearMonthInterval)
328328
} else if (dataType.hasDecimal) { v =>
329329
Decimal(v.getDecimal.getValue)
330330
} else if (dataType.hasCalendarInterval) { v =>
@@ -354,7 +354,7 @@ object LiteralValueProtoConverter {
354354
builder.result()
355355
}
356356

357-
makeArrayData(getConverter(array.getElementType))
357+
makeArrayData(getScalaConverter(array.getElementType))
358358
}
359359

360360
def toCatalystMap(map: proto.Expression.Literal.Map): mutable.Map[_, _] = {
@@ -373,7 +373,7 @@ object LiteralValueProtoConverter {
373373
builder
374374
}
375375

376-
makeMapData(getConverter(map.getKeyType), getConverter(map.getValueType))
376+
makeMapData(getScalaConverter(map.getKeyType), getScalaConverter(map.getValueType))
377377
}
378378

379379
def toCatalystStruct(struct: proto.Expression.Literal.Struct): Any = {
@@ -392,7 +392,7 @@ object LiteralValueProtoConverter {
392392
val structData = elements
393393
.zip(dataTypes)
394394
.map { case (element, dataType) =>
395-
getConverter(dataType)(element)
395+
getScalaConverter(dataType)(element)
396396
}
397397
.asInstanceOf[scala.collection.Seq[Object]]
398398
.toSeq
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
Project [id#0L, id#0L, 1 AS 1#0, null AS NULL#0, true AS true#0, 68 AS 68#0, 9872 AS 9872#0, -8726532 AS -8726532#0, 7834609328726532 AS 7834609328726532#0L, 2.718281828459045 AS 2.718281828459045#0, -0.8 AS -0.8#0, 89.97620 AS 89.97620#0, 89889.7667231 AS 89889.7667231#0, connect! AS connect!#0, T AS T#0, ABCDEFGHIJ AS ABCDEFGHIJ#0, 0x78797A7B7C7D7E7F808182838485868788898A8B8C8D8E AS X'78797A7B7C7D7E7F808182838485868788898A8B8C8D8E'#0, 0x0806 AS X'0806'#0, [8,6] AS ARRAY(8, 6)#0, null AS NULL#0, 2020-10-10 AS DATE '2020-10-10'#0, 8.997620 AS 8.997620#0, 2023-02-23 04:31:59.808 AS TIMESTAMP '2023-02-23 04:31:59.808'#0, 1969-12-31 16:00:12.345 AS TIMESTAMP '1969-12-31 16:00:12.345'#0, 2023-02-23 20:36:00 AS TIMESTAMP_NTZ '2023-02-23 20:36:00'#0, ... 18 more fields]
1+
Project [id#0L, id#0L, 1 AS 1#0, null AS NULL#0, true AS true#0, 68 AS 68#0, 9872 AS 9872#0, -8726532 AS -8726532#0, 7834609328726532 AS 7834609328726532#0L, 2.718281828459045 AS 2.718281828459045#0, -0.8 AS -0.8#0, 89.97620 AS 89.97620#0, 89889.7667231 AS 89889.7667231#0, connect! AS connect!#0, T AS T#0, ABCDEFGHIJ AS ABCDEFGHIJ#0, 0x78797A7B7C7D7E7F808182838485868788898A8B8C8D8E AS X'78797A7B7C7D7E7F808182838485868788898A8B8C8D8E'#0, 0x0806 AS X'0806'#0, [8,6] AS ARRAY(8, 6)#0, null AS NULL#0, 2020-10-10 AS DATE '2020-10-10'#0, 8.997620 AS 8.997620#0, 2023-02-23 04:31:59.808 AS TIMESTAMP '2023-02-23 04:31:59.808'#0, 1969-12-31 16:00:12.345 AS TIMESTAMP '1969-12-31 16:00:12.345'#0, 2023-02-23 20:36:00 AS TIMESTAMP_NTZ '2023-02-23 20:36:00'#0, ... 19 more fields]
22
+- LocalRelation <empty>, [id#0L, a#0, b#0]

sql/connect/common/src/test/resources/query-tests/queries/function_typedLit.json

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,114 @@
652652
}
653653
}
654654
}
655+
}, {
656+
"literal": {
657+
"struct": {
658+
"structType": {
659+
"struct": {
660+
"fields": [{
661+
"name": "_1",
662+
"dataType": {
663+
"date": {
664+
}
665+
},
666+
"nullable": true
667+
}, {
668+
"name": "_2",
669+
"dataType": {
670+
"timestamp": {
671+
}
672+
},
673+
"nullable": true
674+
}, {
675+
"name": "_3",
676+
"dataType": {
677+
"timestamp": {
678+
}
679+
},
680+
"nullable": true
681+
}, {
682+
"name": "_4",
683+
"dataType": {
684+
"timestampNtz": {
685+
}
686+
},
687+
"nullable": true
688+
}, {
689+
"name": "_5",
690+
"dataType": {
691+
"date": {
692+
}
693+
},
694+
"nullable": true
695+
}, {
696+
"name": "_6",
697+
"dataType": {
698+
"dayTimeInterval": {
699+
"startField": 0,
700+
"endField": 3
701+
}
702+
},
703+
"nullable": true
704+
}, {
705+
"name": "_7",
706+
"dataType": {
707+
"yearMonthInterval": {
708+
"startField": 0,
709+
"endField": 1
710+
}
711+
},
712+
"nullable": true
713+
}, {
714+
"name": "_8",
715+
"dataType": {
716+
"calendarInterval": {
717+
}
718+
},
719+
"nullable": true
720+
}]
721+
}
722+
},
723+
"elements": [{
724+
"date": 18545
725+
}, {
726+
"timestamp": "1677155519808000"
727+
}, {
728+
"timestamp": "12345000"
729+
}, {
730+
"timestampNtz": "1677184560000000"
731+
}, {
732+
"date": 19411
733+
}, {
734+
"dayTimeInterval": "200000000"
735+
}, {
736+
"yearMonthInterval": 0
737+
}, {
738+
"calendarInterval": {
739+
"months": 2,
740+
"days": 20,
741+
"microseconds": "100"
742+
}
743+
}]
744+
}
745+
},
746+
"common": {
747+
"origin": {
748+
"jvmOrigin": {
749+
"stackTrace": [{
750+
"classLoaderName": "app",
751+
"declaringClass": "org.apache.spark.sql.functions$",
752+
"methodName": "typedLit",
753+
"fileName": "functions.scala"
754+
}, {
755+
"classLoaderName": "app",
756+
"declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
757+
"methodName": "~~trimmed~anonfun~~",
758+
"fileName": "PlanGenerationTestSuite.scala"
759+
}]
760+
}
761+
}
762+
}
655763
}, {
656764
"literal": {
657765
"integer": 1
Binary file not shown.

0 commit comments

Comments
 (0)