Skip to content

Commit 31721ba

Browse files
sadikoviMaxGekk
authored andcommitted
[SPARK-40646][SQL] Fix returning partial results in JSON data source and JSON functions
### What changes were proposed in this pull request? This PR is a follow-up for [SPARK-33134](https://issues.apache.org/jira/browse/SPARK-33134) (#30031). I found another case when, depending on the order of columns, parsing one JSON field breaks all of the subsequent fields resulting in all nulls: With a file like this: ``` {"a": {"x": 1, "y": true}, "b": {"x": 1}} {"a": {"x": 2}, "b": {"x": 2}} ``` Reading the file results in column `b` as null even though it is a valid column. ```scala val df = spark.read .schema("a struct<x: int, y: struct<x: int>>, b struct<x: int>") .json("path") === a b null null {"x":2,"y":null} {"x":2} ``` However, b column should be: ``` {"x": 1} {"x": 2} ``` This particular example actually used to work in earlier Spark versions but it was affected by SPARK-33134 which fixed another bug with the incorrect parsing in `from_json`. Because this case was not tested, we missed it at the time. In order to fix both SPARK-33134 and SPARK-40646, we need to process `PartialResultException` in `convertArray` method to handle any errors in child objects. Without the fix, the code would not wrap the row in the array for `from_json` resulting in a ClassCastException (SPARK-33134). Because of this handling, we don't need `isRoot` check anymore in `convertObject` thus unblocking SPARK-40646. I updated the code to handle both cases. With these changes, we can correctly parse this case: ```scala val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0") checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(Array(Row(123456, null)))) ``` which was previously returning `null` for the root row. ### Why are the changes needed? Fixes a long-standing issue when parsing a JSON with an incorrect field that would break parsing of the entire record. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I added unit tests for SPARK-40646 as well as SPARK-33134. Closes #38090 from sadikovi/SPARK-40646. Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com> Signed-off-by: Max Gekk <max.gekk@gmail.com>
1 parent 636bd76 commit 31721ba

File tree

3 files changed

+130
-8
lines changed

3 files changed

+130
-8
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ class JacksonParser(
456456
schema.existenceDefaultsBitmask(index) = false
457457
} catch {
458458
case e: SparkUpgradeException => throw e
459-
case NonFatal(e) if isRoot =>
459+
case NonFatal(e) =>
460460
badRecordException = badRecordException.orElse(Some(e))
461461
parser.skipChildren()
462462
}
@@ -482,14 +482,31 @@ class JacksonParser(
482482
fieldConverter: ValueConverter): MapData = {
483483
val keys = ArrayBuffer.empty[UTF8String]
484484
val values = ArrayBuffer.empty[Any]
485+
var badRecordException: Option[Throwable] = None
486+
485487
while (nextUntil(parser, JsonToken.END_OBJECT)) {
486488
keys += UTF8String.fromString(parser.getCurrentName)
487-
values += fieldConverter.apply(parser)
489+
try {
490+
values += fieldConverter.apply(parser)
491+
} catch {
492+
case PartialResultException(row, cause) =>
493+
badRecordException = badRecordException.orElse(Some(cause))
494+
values += row
495+
case NonFatal(e) =>
496+
badRecordException = badRecordException.orElse(Some(e))
497+
parser.skipChildren()
498+
}
488499
}
489500

490501
// The JSON map will never have null or duplicated map keys, it's safe to create a
491502
// ArrayBasedMapData directly here.
492-
ArrayBasedMapData(keys.toArray, values.toArray)
503+
val mapData = ArrayBasedMapData(keys.toArray, values.toArray)
504+
505+
if (badRecordException.isEmpty) {
506+
mapData
507+
} else {
508+
throw PartialResultException(InternalRow(mapData), badRecordException.get)
509+
}
493510
}
494511

495512
/**
@@ -500,13 +517,27 @@ class JacksonParser(
500517
fieldConverter: ValueConverter,
501518
isRoot: Boolean = false): ArrayData = {
502519
val values = ArrayBuffer.empty[Any]
520+
var badRecordException: Option[Throwable] = None
521+
503522
while (nextUntil(parser, JsonToken.END_ARRAY)) {
504-
val v = fieldConverter.apply(parser)
505-
if (isRoot && v == null) throw QueryExecutionErrors.rootConverterReturnNullError()
506-
values += v
523+
try {
524+
val v = fieldConverter.apply(parser)
525+
if (isRoot && v == null) throw QueryExecutionErrors.rootConverterReturnNullError()
526+
values += v
527+
} catch {
528+
case PartialResultException(row, cause) =>
529+
badRecordException = badRecordException.orElse(Some(cause))
530+
values += row
531+
}
507532
}
508533

509-
new GenericArrayData(values.toArray)
534+
val arrayData = new GenericArrayData(values.toArray)
535+
536+
if (badRecordException.isEmpty) {
537+
arrayData
538+
} else {
539+
throw PartialResultException(InternalRow(arrayData), badRecordException.get)
540+
}
510541
}
511542

512543
/**

sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -853,11 +853,81 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
853853
val df2 = Seq("""{"data": {"c2": [19], "c1": 123456}}""").toDF("c0")
854854
checkAnswer(df2.select(from_json($"c0", new StructType().add("data", st))), Row(Row(null)))
855855
val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0")
856-
checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(null))
856+
checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(Array(Row(123456, null))))
857857
val df4 = Seq("""{"c2": [19]}""").toDF("c0")
858858
checkAnswer(df4.select(from_json($"c0", MapType(StringType, st))), Row(null))
859859
}
860860

861+
test("SPARK-40646: return partial results for JSON arrays with objects") {
862+
val st = new StructType()
863+
.add("c1", StringType)
864+
.add("c2", ArrayType(new StructType().add("a", LongType)))
865+
866+
// "c2" is expected to be an array of structs but it is a struct in the data.
867+
val df = Seq("""[{"c2": {"a": 1}, "c1": "abc"}]""").toDF("c0")
868+
checkAnswer(
869+
df.select(from_json($"c0", ArrayType(st))),
870+
Row(Array(Row("abc", null)))
871+
)
872+
}
873+
874+
test("SPARK-40646: return partial results for JSON maps") {
875+
val st = new StructType()
876+
.add("c1", MapType(StringType, IntegerType))
877+
.add("c2", StringType)
878+
879+
// Map "c2" has "k2" key that is a string, not an integer.
880+
val df = Seq("""{"c1": {"k1": 1, "k2": "A", "k3": 3}, "c2": "abc"}""").toDF("c0")
881+
checkAnswer(
882+
df.select(from_json($"c0", st)),
883+
Row(Row(null, "abc"))
884+
)
885+
}
886+
887+
test("SPARK-40646: return partial results for JSON arrays") {
888+
val st = new StructType()
889+
.add("c", ArrayType(IntegerType))
890+
891+
// Values in the array are strings instead of integers.
892+
val df = Seq("""["a", "b", "c"]""").toDF("c0")
893+
checkAnswer(
894+
df.select(from_json($"c0", ArrayType(st))),
895+
Row(null)
896+
)
897+
}
898+
899+
test("SPARK-40646: return partial results for nested JSON arrays") {
900+
val st = new StructType()
901+
.add("c", ArrayType(ArrayType(IntegerType)))
902+
903+
// The second array contains a string instead of an integer.
904+
val df = Seq("""[[1], ["2"]]""").toDF("c0")
905+
checkAnswer(
906+
df.select(from_json($"c0", ArrayType(st))),
907+
Row(null)
908+
)
909+
}
910+
911+
test("SPARK-40646: return partial results for objects with values as JSON arrays") {
912+
val st = new StructType()
913+
.add("c1",
914+
ArrayType(
915+
StructType(
916+
StructField("c2", ArrayType(IntegerType)) ::
917+
Nil
918+
)
919+
)
920+
)
921+
922+
// Value "a" cannot be parsed as an integer,
923+
// the error cascades to "c2", thus making its value null.
924+
val df = Seq("""[{"c1": [{"c2": ["a"]}]}]""").toDF("c0")
925+
checkAnswer(
926+
df.select(from_json($"c0", ArrayType(st))),
927+
Row(Array(Row(null)))
928+
)
929+
}
930+
861931
test("SPARK-33270: infers schema for JSON field with spaces and pass them to from_json") {
862932
val in = Seq("""{"a b": 1}""").toDS()
863933
val out = in.select(from_json($"value", schema_of_json("""{"a b": 100}""")) as "parsed")

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3381,6 +3381,27 @@ abstract class JsonSuite
33813381
}
33823382
}
33833383

3384+
test("SPARK-40646: parse subsequent fields if the first JSON field does not match schema") {
3385+
// In this example, the first record has "a.y" as boolean but it needs to be an object.
3386+
// We should parse "a" as null but continue parsing "b" correctly as it is valid.
3387+
withTempPath { path =>
3388+
Seq(
3389+
"""{"a": {"x": 1, "y": true}, "b": {"x": 1}}""",
3390+
"""{"a": {"x": 2}, "b": {"x": 2}}"""").toDF()
3391+
.repartition(1)
3392+
.write.text(path.getAbsolutePath)
3393+
3394+
val df = spark.read
3395+
.schema("a struct<x: int, y: struct<x: int>>, b struct<x: int>")
3396+
.json(path.getAbsolutePath)
3397+
3398+
checkAnswer(
3399+
df,
3400+
Seq(Row(null, Row(1)), Row(Row(2, null), Row(2)))
3401+
)
3402+
}
3403+
}
3404+
33843405
test("SPARK-40667: validate JSON Options") {
33853406
assert(JSONOptions.getAllOptions.size == 28)
33863407
// Please add validation on any new Json options here

0 commit comments

Comments
 (0)