Skip to content

Commit 33b5039

Browse files
MaxGekkcloud-fan
authored andcommitted
[SPARK-25935][SQL] Allow null rows for bad records from JSON/CSV parsers
## What changes were proposed in this pull request? This PR reverts #22938 per discussion in #23325 Closes #23325 Closes #23543 from MaxGekk/return-nulls-from-json-parser. Authored-by: Maxim Gekk <max.gekk@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent abc937b commit 33b5039

File tree

7 files changed

+23
-28
lines changed

7 files changed

+23
-28
lines changed

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1694,7 +1694,7 @@ test_that("column functions", {
16941694

16951695
# check for unparseable
16961696
df <- as.DataFrame(list(list("a" = "")))
1697-
expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]]$a, NA)
1697+
expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]], NA)
16981698

16991699
# check if array type in string is correctly supported.
17001700
jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]"

docs/sql-migration-guide-upgrade.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ displayTitle: Spark SQL Upgrading Guide
1717

1818
- Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`.
1919

20-
- In Spark version 2.4 and earlier, the `from_json` function produces `null`s for JSON strings and JSON datasource skips the same independently of its mode if there is no valid root JSON token in its input (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`.
21-
2220
- The `ADD JAR` command previously returned a result set with the single value 0. It now returns an empty result set.
2321

2422
- In Spark version 2.4 and earlier, users can create map values with map type key via built-in function like `CreateMap`, `MapFromArrays`, etc. Since Spark 3.0, it's not allowed to create map values with map type key with these built-in functions. Users can still read map values with map type key from data source or Java/Scala collections, though they are not very useful.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -548,23 +548,15 @@ case class JsonToStructs(
548548
s"Input schema ${nullableSchema.catalogString} must be a struct, an array or a map.")
549549
}
550550

551-
@transient
552-
private lazy val castRow = nullableSchema match {
553-
case _: StructType => (row: InternalRow) => row
554-
case _: ArrayType => (row: InternalRow) => row.getArray(0)
555-
case _: MapType => (row: InternalRow) => row.getMap(0)
556-
}
557-
558551
// This converts parsed rows to the desired output by the given schema.
559-
private def convertRow(rows: Iterator[InternalRow]) = {
560-
if (rows.hasNext) {
561-
val result = rows.next()
562-
// JSON's parser produces one record only.
563-
assert(!rows.hasNext)
564-
castRow(result)
565-
} else {
566-
throw new IllegalArgumentException("Expected one row from JSON parser.")
567-
}
552+
@transient
553+
lazy val converter = nullableSchema match {
554+
case _: StructType =>
555+
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null
556+
case _: ArrayType =>
557+
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null
558+
case _: MapType =>
559+
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null
568560
}
569561

570562
val nameOfCorruptRecord = SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD)
@@ -600,7 +592,7 @@ case class JsonToStructs(
600592
copy(timeZoneId = Option(timeZoneId))
601593

602594
override def nullSafeEval(json: Any): Any = {
603-
convertRow(parser.parse(json.asInstanceOf[UTF8String]))
595+
converter(parser.parse(json.asInstanceOf[UTF8String]))
604596
}
605597

606598
override def inputTypes: Seq[AbstractDataType] = StringType :: Nil

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ class JacksonParser(
399399
// a null first token is equivalent to testing for input.trim.isEmpty
400400
// but it works on any token stream and not just strings
401401
parser.nextToken() match {
402-
case null => throw new RuntimeException("Not found any JSON token")
402+
case null => Nil
403403
case _ => rootConverter.apply(parser) match {
404404
case null => throw new RuntimeException("Root converter returned null")
405405
case rows => rows

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
548548
val schema = StructType(StructField("a", IntegerType) :: Nil)
549549
checkEvaluation(
550550
JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId),
551-
InternalRow(null)
551+
null
552552
)
553553
}
554554

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,16 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
240240
Seq(Row("1"), Row("2")))
241241
}
242242

243+
test("SPARK-11226 Skip empty line in json file") {
244+
spark.read
245+
.json(Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}", "").toDS())
246+
.createOrReplaceTempView("d")
247+
248+
checkAnswer(
249+
sql("select count(1) from d"),
250+
Seq(Row(3)))
251+
}
252+
243253
test("SPARK-8828 sum should return null if all input values are null") {
244254
checkAnswer(
245255
sql("select sum(a), avg(a) from allNulls"),

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,7 +1126,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
11261126
Row(null, null, null),
11271127
Row(null, null, null),
11281128
Row(null, null, null),
1129-
Row(null, null, null),
11301129
Row("str_a_4", "str_b_4", "str_c_4"),
11311130
Row(null, null, null))
11321131
)
@@ -1148,7 +1147,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
11481147
checkAnswer(
11491148
jsonDF.select($"a", $"b", $"c", $"_unparsed"),
11501149
Row(null, null, null, "{") ::
1151-
Row(null, null, null, "") ::
11521150
Row(null, null, null, """{"a":1, b:2}""") ::
11531151
Row(null, null, null, """{"a":{, b:3}""") ::
11541152
Row("str_a_4", "str_b_4", "str_c_4", null) ::
@@ -1163,7 +1161,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
11631161
checkAnswer(
11641162
jsonDF.filter($"_unparsed".isNotNull).select($"_unparsed"),
11651163
Row("{") ::
1166-
Row("") ::
11671164
Row("""{"a":1, b:2}""") ::
11681165
Row("""{"a":{, b:3}""") ::
11691166
Row("]") :: Nil
@@ -1185,7 +1182,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
11851182
checkAnswer(
11861183
jsonDF.selectExpr("a", "b", "c", "_malformed"),
11871184
Row(null, null, null, "{") ::
1188-
Row(null, null, null, "") ::
11891185
Row(null, null, null, """{"a":1, b:2}""") ::
11901186
Row(null, null, null, """{"a":{, b:3}""") ::
11911187
Row("str_a_4", "str_b_4", "str_c_4", null) ::
@@ -1727,7 +1723,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
17271723
val path = dir.getCanonicalPath
17281724
primitiveFieldAndType
17291725
.toDF("value")
1730-
.repartition(1)
17311726
.write
17321727
.option("compression", "GzIp")
17331728
.text(path)
@@ -2428,7 +2423,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
24282423
}
24292424

24302425
checkCount(2)
2431-
countForMalformedJSON(1, Seq(""))
2426+
countForMalformedJSON(0, Seq(""))
24322427
}
24332428

24342429
test("SPARK-25040: empty strings should be disallowed") {

0 commit comments

Comments
 (0)