Skip to content

Commit 38628dd

Browse files
MaxGekkcloud-fan
authored andcommitted
[SPARK-25935][SQL] Prevent null rows from JSON parser
## What changes were proposed in this pull request? An input without valid JSON tokens on the root level will be treated as a bad record, and handled according to `mode`. Previously such input was converted to `null`. After the changes, the input is converted to a row with `null`s in the `PERMISSIVE` mode according the schema. This allows to remove a code in the `from_json` function which can produce `null` as result rows. ## How was this patch tested? It was tested by existing test suites. Some of them I have to modify (`JsonSuite` for example) because previously bad input was just silently ignored. For now such input is handled according to specified `mode`. Closes #22938 from MaxGekk/json-nulls. Lead-authored-by: Maxim Gekk <max.gekk@gmail.com> Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent ce7b57c commit 38628dd

File tree

7 files changed

+31
-25
lines changed

7 files changed

+31
-25
lines changed

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1674,7 +1674,7 @@ test_that("column functions", {
16741674

16751675
# check for unparseable
16761676
df <- as.DataFrame(list(list("a" = "")))
1677-
expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]], NA)
1677+
expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]]$a, NA)
16781678

16791679
# check if array type in string is correctly supported.
16801680
jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]"

docs/sql-migration-guide-upgrade.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ displayTitle: Spark SQL Upgrading Guide
1515

1616
- Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`.
1717

18+
- In Spark version 2.4 and earlier, the `from_json` function produces `null`s for JSON strings and JSON datasource skips the same independetly of its mode if there is no valid root JSON token in its input (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`.
19+
1820
- The `ADD JAR` command previously returned a result set with the single value 0. It now returns an empty result set.
1921

2022
- In Spark version 2.4 and earlier, users can create map values with map type key via built-in function like `CreateMap`, `MapFromArrays`, etc. Since Spark 3.0, it's not allowed to create map values with map type key with these built-in functions. Users can still read map values with map type key from data source or Java/Scala collections, though they are not very useful.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -550,15 +550,23 @@ case class JsonToStructs(
550550
s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.")
551551
}
552552

553-
// This converts parsed rows to the desired output by the given schema.
554553
@transient
555-
lazy val converter = nullableSchema match {
556-
case _: StructType =>
557-
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null
558-
case _: ArrayType =>
559-
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null
560-
case _: MapType =>
561-
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null
554+
private lazy val castRow = nullableSchema match {
555+
case _: StructType => (row: InternalRow) => row
556+
case _: ArrayType => (row: InternalRow) => row.getArray(0)
557+
case _: MapType => (row: InternalRow) => row.getMap(0)
558+
}
559+
560+
// This converts parsed rows to the desired output by the given schema.
561+
private def convertRow(rows: Iterator[InternalRow]) = {
562+
if (rows.hasNext) {
563+
val result = rows.next()
564+
// JSON's parser produces one record only.
565+
assert(!rows.hasNext)
566+
castRow(result)
567+
} else {
568+
throw new IllegalArgumentException("Expected one row from JSON parser.")
569+
}
562570
}
563571

564572
val nameOfCorruptRecord = SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD)
@@ -593,7 +601,7 @@ case class JsonToStructs(
593601
copy(timeZoneId = Option(timeZoneId))
594602

595603
override def nullSafeEval(json: Any): Any = {
596-
converter(parser.parse(json.asInstanceOf[UTF8String]))
604+
convertRow(parser.parse(json.asInstanceOf[UTF8String]))
597605
}
598606

599607
override def inputTypes: Seq[AbstractDataType] = StringType :: Nil

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ class JacksonParser(
399399
// a null first token is equivalent to testing for input.trim.isEmpty
400400
// but it works on any token stream and not just strings
401401
parser.nextToken() match {
402-
case null => Nil
402+
case null => throw new RuntimeException("Not found any JSON token")
403403
case _ => rootConverter.apply(parser) match {
404404
case null => throw new RuntimeException("Root converter returned null")
405405
case rows => rows

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
547547
val schema = StructType(StructField("a", IntegerType) :: Nil)
548548
checkEvaluation(
549549
JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId),
550-
null
550+
InternalRow(null)
551551
)
552552
}
553553

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -240,16 +240,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
240240
Seq(Row("1"), Row("2")))
241241
}
242242

243-
test("SPARK-11226 Skip empty line in json file") {
244-
spark.read
245-
.json(Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}", "").toDS())
246-
.createOrReplaceTempView("d")
247-
248-
checkAnswer(
249-
sql("select count(1) from d"),
250-
Seq(Row(3)))
251-
}
252-
253243
test("SPARK-8828 sum should return null if all input values are null") {
254244
checkAnswer(
255245
sql("select sum(a), avg(a) from allNulls"),

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1115,6 +1115,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
11151115
Row(null, null, null),
11161116
Row(null, null, null),
11171117
Row(null, null, null),
1118+
Row(null, null, null),
11181119
Row("str_a_4", "str_b_4", "str_c_4"),
11191120
Row(null, null, null))
11201121
)
@@ -1136,6 +1137,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
11361137
checkAnswer(
11371138
jsonDF.select($"a", $"b", $"c", $"_unparsed"),
11381139
Row(null, null, null, "{") ::
1140+
Row(null, null, null, "") ::
11391141
Row(null, null, null, """{"a":1, b:2}""") ::
11401142
Row(null, null, null, """{"a":{, b:3}""") ::
11411143
Row("str_a_4", "str_b_4", "str_c_4", null) ::
@@ -1150,6 +1152,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
11501152
checkAnswer(
11511153
jsonDF.filter($"_unparsed".isNotNull).select($"_unparsed"),
11521154
Row("{") ::
1155+
Row("") ::
11531156
Row("""{"a":1, b:2}""") ::
11541157
Row("""{"a":{, b:3}""") ::
11551158
Row("]") :: Nil
@@ -1171,6 +1174,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
11711174
checkAnswer(
11721175
jsonDF.selectExpr("a", "b", "c", "_malformed"),
11731176
Row(null, null, null, "{") ::
1177+
Row(null, null, null, "") ::
11741178
Row(null, null, null, """{"a":1, b:2}""") ::
11751179
Row(null, null, null, """{"a":{, b:3}""") ::
11761180
Row("str_a_4", "str_b_4", "str_c_4", null) ::
@@ -1813,6 +1817,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
18131817
val path = dir.getCanonicalPath
18141818
primitiveFieldAndType
18151819
.toDF("value")
1820+
.repartition(1)
18161821
.write
18171822
.option("compression", "GzIp")
18181823
.text(path)
@@ -1838,6 +1843,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
18381843
val path = dir.getCanonicalPath
18391844
primitiveFieldAndType
18401845
.toDF("value")
1846+
.repartition(1)
18411847
.write
18421848
.text(path)
18431849

@@ -1892,7 +1898,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
18921898
.text(path)
18931899

18941900
val jsonDF = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json(path)
1895-
assert(jsonDF.count() === corruptRecordCount)
1901+
assert(jsonDF.count() === corruptRecordCount + 1) // null row for empty file
18961902
assert(jsonDF.schema === new StructType()
18971903
.add("_corrupt_record", StringType)
18981904
.add("dummy", StringType))
@@ -1905,7 +1911,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
19051911
F.count($"dummy").as("valid"),
19061912
F.count($"_corrupt_record").as("corrupt"),
19071913
F.count("*").as("count"))
1908-
checkAnswer(counts, Row(1, 4, 6))
1914+
checkAnswer(counts, Row(1, 5, 7)) // null row for empty file
19091915
}
19101916
}
19111917

@@ -2513,7 +2519,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
25132519
}
25142520

25152521
checkCount(2)
2516-
countForMalformedJSON(0, Seq(""))
2522+
countForMalformedJSON(1, Seq(""))
25172523
}
25182524

25192525
test("SPARK-25040: empty strings should be disallowed") {

0 commit comments

Comments
 (0)