Skip to content

[SPARK-26376][SQL] Skip inputs without tokens by JSON datasource #23325

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/sql-migration-guide-upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ displayTitle: Spark SQL Upgrading Guide

- Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`.

- In Spark version 2.4 and earlier, the `from_json` function produces `null`s for JSON strings and JSON datasource skips the same independetly of its mode if there is no valid root JSON token in its input (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`.
- In Spark version 2.4 and earlier, the `from_json` function produces `null`s for JSON strings without valid root JSON tokens (` ` for example). Since Spark 3.0, such input is treated as a bad record and handled according to specified mode. For example, in the `PERMISSIVE` mode the ` ` input is converted to `Row(null, null)` if specified schema is `key STRING, value INT`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skipping seems to be unclear here. Could you elaborate the difference?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to compare JSON datasource and json functions in this note?


- The `ADD JAR` command previously returned a result set with the single value 0. It now returns an empty result set.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,11 @@ case class JsonToStructs(
(StructType(StructField("value", other) :: Nil), other)
}

val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = false)
val rawParser = new JacksonParser(
actualSchema,
parsedOptions,
allowArrayAsStructs = false,
skipInputWithoutTokens = false)
val createParser = CreateJacksonParser.utf8String _

new FailureSafeParser[UTF8String](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ import org.apache.spark.util.Utils
class JacksonParser(
schema: DataType,
val options: JSONOptions,
allowArrayAsStructs: Boolean) extends Logging {
allowArrayAsStructs: Boolean,
skipInputWithoutTokens: Boolean) extends Logging {

import JacksonUtils._
import com.fasterxml.jackson.core.JsonToken._
Expand Down Expand Up @@ -418,6 +419,7 @@ class JacksonParser(
// a null first token is equivalent to testing for input.trim.isEmpty
// but it works on any token stream and not just strings
parser.nextToken() match {
case null if skipInputWithoutTokens => Nil
case null => throw new RuntimeException("Not found any JSON token")
case _ => rootConverter.apply(parser) match {
case null => throw new RuntimeException("Root converter returned null")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {

val createParser = CreateJacksonParser.string _
val parsed = jsonDataset.rdd.mapPartitions { iter =>
val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true)
val rawParser = new JacksonParser(
actualSchema,
parsedOptions,
allowArrayAsStructs = true,
skipInputWithoutTokens = true)
val parser = new FailureSafeParser[String](
input => rawParser.parse(input, createParser, UTF8String.fromString),
parsedOptions.parseMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
}

(file: PartitionedFile) => {
val parser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true)
val parser = new JacksonParser(
actualSchema,
parsedOptions,
allowArrayAsStructs = true,
skipInputWithoutTokens = true)
JsonDataSource(parsedOptions).readFile(
broadcastedHadoopConf.value.value,
file,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {

val dummyOption = new JSONOptions(Map.empty[String, String], "GMT")
val dummySchema = StructType(Seq.empty)
val parser = new JacksonParser(dummySchema, dummyOption, allowArrayAsStructs = true)
val parser = new JacksonParser(
dummySchema,
dummyOption,
allowArrayAsStructs = true,
skipInputWithoutTokens = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we have a test coverage for both true and false? In case of false, we should not skip the rows, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's handled by from_json tests. If not, let's add a new UT here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both cases are covered already, for example:

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for clarifying that, @MaxGekk and @cloud-fan .


Utils.tryWithResource(factory.createParser(writer.toString)) { jsonParser =>
jsonParser.nextToken()
Expand Down Expand Up @@ -1114,7 +1118,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
Row(null, null, null),
Row(null, null, null),
Row(null, null, null),
Row(null, null, null),
Row("str_a_4", "str_b_4", "str_c_4"),
Row(null, null, null))
)
Expand All @@ -1136,7 +1139,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
checkAnswer(
jsonDF.select($"a", $"b", $"c", $"_unparsed"),
Row(null, null, null, "{") ::
Row(null, null, null, "") ::
Row(null, null, null, """{"a":1, b:2}""") ::
Row(null, null, null, """{"a":{, b:3}""") ::
Row("str_a_4", "str_b_4", "str_c_4", null) ::
Expand All @@ -1151,7 +1153,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
checkAnswer(
jsonDF.filter($"_unparsed".isNotNull).select($"_unparsed"),
Row("{") ::
Row("") ::
Row("""{"a":1, b:2}""") ::
Row("""{"a":{, b:3}""") ::
Row("]") :: Nil
Expand All @@ -1173,7 +1174,6 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
checkAnswer(
jsonDF.selectExpr("a", "b", "c", "_malformed"),
Row(null, null, null, "{") ::
Row(null, null, null, "") ::
Row(null, null, null, """{"a":1, b:2}""") ::
Row(null, null, null, """{"a":{, b:3}""") ::
Row("str_a_4", "str_b_4", "str_c_4", null) ::
Expand Down Expand Up @@ -2517,7 +2517,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}

checkCount(2)
countForMalformedJSON(1, Seq(""))
countForMalformedJSON(0, Seq(""))
}

test("SPARK-25040: empty strings should be disallowed") {
Expand Down