Skip to content

[SPARK-25243][SQL] Use FailureSafeParser in from_json #22237

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c2e7078
Support any type by FailureSafeParser
MaxGekk Aug 26, 2018
fe2baa4
Use FailSafeParser in from_json
MaxGekk Aug 26, 2018
cecc8f5
Fix tests
MaxGekk Aug 26, 2018
75bdb03
Removing unused bind variable
MaxGekk Aug 26, 2018
7a8804d
Adding tests for different modes
MaxGekk Aug 26, 2018
01b63f1
Improve a test
MaxGekk Aug 26, 2018
b1894d2
Fix a sql test
MaxGekk Aug 26, 2018
b76b8d3
Addressing Takeshi's review comments
MaxGekk Aug 28, 2018
104ee44
Restring to PERMISSIVE and FAILFAST modes so far
MaxGekk Aug 28, 2018
a87785a
Changing the test according to the PERMISSIVE mode
MaxGekk Aug 29, 2018
c3091b3
Renaming schema -> dataType
MaxGekk Aug 30, 2018
55be20b
Throwing AnalysisException instead of IllegalArgumentException
MaxGekk Aug 30, 2018
ce49b24
Check that the AnalysisException is thrown during producing a plan
MaxGekk Aug 31, 2018
57eb59f
Replacing AnalysisException by IllegalArgumentException
MaxGekk Sep 10, 2018
20b7522
Fix a test
MaxGekk Sep 18, 2018
63b8b66
Improving the test
MaxGekk Sep 18, 2018
a5489f5
Removing the BadRecordException handler
MaxGekk Sep 18, 2018
9904903
Updating test's title
MaxGekk Sep 18, 2018
bda3a4e
Addressing Hyukjin's review comments
MaxGekk Oct 5, 2018
fa20fd2
Produce null and put input to the corrupted column if an input array …
MaxGekk Oct 9, 2018
2663696
Removing special handling of empty strings
MaxGekk Oct 9, 2018
b84b343
Don't allow parsing arrays as structs by from_json
MaxGekk Oct 9, 2018
e22b974
Modified a test to check the spark.sql.columnNameOfCorruptRecord config
MaxGekk Oct 10, 2018
4157141
Improving tests
MaxGekk Oct 10, 2018
54be09c
Fix a python test
MaxGekk Oct 11, 2018
3f04f7f
add migration guide
HyukjinKwon Oct 24, 2018
b2988c7
address comments
HyukjinKwon Oct 24, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1694,7 +1694,7 @@ test_that("column functions", {
df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}")))
schema2 <- structType(structField("date", "date"))
s <- collect(select(df, from_json(df$col, schema2)))
expect_equal(s[[1]][[1]], NA)
expect_equal(s[[1]][[1]]$date, NA)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason we made this change?

Copy link
Member Author

@MaxGekk MaxGekk Nov 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean this particular line or in general?

This line was changed because in the PERMISSIVE mode we usually return a Row with null fields that we wasn't able to parse instead of just null for whole row.

In general, to support the PERMISSIVE and FAILFAST modes as for JSON datasource. Before the changes from_json didn't support any modes and the columnNameOfCorruptRecord option in particular.

s <- collect(select(df, from_json(df$col, schema2, dateFormat = "dd/MM/yyyy")))
expect_is(s[[1]][[1]]$date, "Date")
expect_equal(as.character(s[[1]][[1]]$date), "2014-10-21")
Expand Down
2 changes: 2 additions & 0 deletions docs/sql-migration-guide-upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ displayTitle: Spark SQL Upgrading Guide

- In Spark version 2.4 and earlier, the parser of JSON data source treats empty strings as null for some data types such as `IntegerType`. For `FloatType` and `DoubleType`, it fails on empty strings and throws exceptions. Since Spark 3.0, we disallow empty strings and will throw exceptions for data types except for `StringType` and `BinaryType`.

- Since Spark 3.0, the `from_json` functions supports two modes - `PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing of malformed JSON records. For example, the JSON string `{"a" 1}` with the schema `a INT` is converted to `null` by previous versions but Spark 3.0 converts it to `Row(null)`.

## Upgrading From Spark SQL 2.3 to 2.4

- In Spark version 2.3 and earlier, the second parameter to array_contains function is implicitly promoted to the element type of first array type parameter. This type promotion can be lossy and may cause `array_contains` function to return wrong result. This problem has been addressed in 2.4 by employing a safer type promotion mechanism. This can cause some change in behavior and are illustrated in the table below.
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2305,7 +2305,7 @@ def from_json(col, schema, options={}):
[Row(json=[Row(a=1)])]
>>> schema = schema_of_json(lit('''{"a": 0}'''))
>>> df.select(from_json(df.value, schema).alias("json")).collect()
[Row(json=Row(a=1))]
[Row(json=Row(a=None))]
>>> data = [(1, '''[1, 2, 3]''')]
>>> schema = ArrayType(IntegerType())
>>> df = spark.createDataFrame(data, ("key", "value"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,54 +554,44 @@ case class JsonToStructs(
@transient
lazy val converter = nullableSchema match {
case _: StructType =>
(rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we may still return null in some cases, can you list them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't return null. I will replace the null by exceptions like I did for from_csv: https://github.com/apache/spark/pull/22379/files#diff-5321c01e95bffc4413c5f3457696213eR83

Copy link
Member Author

@MaxGekk MaxGekk Oct 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I am wrong we return empty iterator and as the consequence of that null in the case if there is no input tokens, there

We can throw BadRecordException instead of Nil but this will change behavior of JSON/CSV datasources, so they will return rows with nulls for empty lines (in PERMISSIVE mode).

Copy link
Contributor

@cloud-fan cloud-fan Oct 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't have to do it in this PR, but it would be great to document when this expression will return null, in the class doc of JsonToStructs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... and with some tests to verify it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't have to do it in this PR, but it would be great to document when this expression will return null ...

We already state in the docs for from_json(): Returns null, in the case of an unparseable string.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now the case is more concrete, we return null if Jackson parser doesn't find any token in the input. Not sure, this detailed info about underlying problem can help users much more.

case _: ArrayType =>
(rows: Seq[InternalRow]) => rows.head.getArray(0)
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null
case _: MapType =>
(rows: Seq[InternalRow]) => rows.head.getMap(0)
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null
}

@transient
lazy val parser =
new JacksonParser(
nullableSchema,
new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get))
val nameOfCorruptRecord = SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD)
@transient lazy val parser = {
val parsedOptions = new JSONOptions(options, timeZoneId.get, nameOfCorruptRecord)
val mode = parsedOptions.parseMode
if (mode != PermissiveMode && mode != FailFastMode) {

This comment was marked as resolved.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean by introducing new val failFast to JSONOptions?

Copy link
Contributor

@cloud-fan cloud-fan Sep 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, instead of the "mode" option.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JSONOptions is shared among build-in json functions like from_json and JSON datasource. And the formal one use 3 modes - FAILFAST, DROPMALFORMED and PERMISSIVE. I am not sure how the mode mode can be replaced. The approach that I could image is to inherit from JSONOptions and add new val. The mode itself cannot be removed because it is used in FailureSafeParser for example, in particular DropMalformedMode is handled explicitly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see. If the mode option already exist, let's keep it.

throw new IllegalArgumentException(s"from_json() doesn't support the ${mode.name} mode. " +
s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}.")
}
val rawParser = new JacksonParser(nullableSchema, parsedOptions, allowArrayAsStructs = false)
val createParser = CreateJacksonParser.utf8String _

val parserSchema = nullableSchema match {
case s: StructType => s
case other => StructType(StructField("value", other) :: Nil)
}

new FailureSafeParser[UTF8String](
input => rawParser.parse(input, createParser, identity[UTF8String]),
mode,
parserSchema,
parsedOptions.columnNameOfCorruptRecord,
parsedOptions.multiLine)
}

override def dataType: DataType = nullableSchema

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
copy(timeZoneId = Option(timeZoneId))

override def nullSafeEval(json: Any): Any = {
// When input is,
// - `null`: `null`.
// - invalid json: `null`.
// - empty string: `null`.
//
// When the schema is array,
// - json array: `Array(Row(...), ...)`
// - json object: `Array(Row(...))`
// - empty json array: `Array()`.
// - empty json object: `Array(Row(null))`.
//
// When the schema is a struct,
// - json object/array with single element: `Row(...)`
// - json array with multiple elements: `null`
// - empty json array: `null`.
// - empty json object: `Row(null)`.

// We need `null` if the input string is an empty string. `JacksonParser` can
// deal with this but produces `Nil`.
if (json.toString.trim.isEmpty) return null

try {
converter(parser.parse(
json.asInstanceOf[UTF8String],
CreateJacksonParser.utf8String,
identity[UTF8String]))
} catch {
case _: BadRecordException => null
}
converter(parser.parse(json.asInstanceOf[UTF8String]))
}

override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ import org.apache.spark.util.Utils
*/
class JacksonParser(
schema: DataType,
val options: JSONOptions) extends Logging {
val options: JSONOptions,
allowArrayAsStructs: Boolean) extends Logging {

import JacksonUtils._
import com.fasterxml.jackson.core.JsonToken._
Expand Down Expand Up @@ -84,7 +85,7 @@ class JacksonParser(
// List([str_a_1,null])
// List([str_a_2,null], [null,str_b_3])
//
case START_ARRAY =>
case START_ARRAY if allowArrayAsStructs =>
val array = convertArray(parser, elementConverter)
// Here, as we support reading top level JSON arrays and take every element
// in such an array as a row, this case is possible.
Expand All @@ -93,6 +94,8 @@ class JacksonParser(
} else {
array.toArray[InternalRow](schema).toSeq
}
case START_ARRAY =>
throw new RuntimeException("Parsing JSON arrays as structs is forbidden.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst.expressions

import java.util.Calendar

import org.apache.spark.SparkFunSuite
import org.scalatest.exceptions.TestFailedException

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.plans.PlanTestBase
Expand Down Expand Up @@ -409,14 +411,18 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
null
InternalRow(null)
)

// Other modes should still return `null`.
checkEvaluation(
JsonToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(jsonData), gmtId),
null
)
val exception = intercept[TestFailedException] {
checkEvaluation(
JsonToStructs(schema, Map("mode" -> FailFastMode.name), Literal(jsonData), gmtId),
InternalRow(null)
)
}.getCause
assert(exception.isInstanceOf[SparkException])
assert(exception.getMessage.contains(
"Malformed records are detected in record parsing. Parse Mode: FAILFAST"))
}

test("from_json - input=array, schema=array, output=array") {
Expand Down Expand Up @@ -450,21 +456,23 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
test("from_json - input=array of single object, schema=struct, output=single row") {
val input = """[{"a": 1}]"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
val output = InternalRow(1)
val output = InternalRow(null)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
}

test("from_json - input=array, schema=struct, output=null") {
test("from_json - input=array, schema=struct, output=single row") {
val input = """[{"a": 1}, {"a": 2}]"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
val output = null
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
val corrupted = "corrupted"
val schema = new StructType().add("a", IntegerType).add(corrupted, StringType)
val output = InternalRow(null, UTF8String.fromString(input))
val options = Map("columnNameOfCorruptRecord" -> corrupted)
checkEvaluation(JsonToStructs(schema, options, Literal(input), gmtId), output)
}

test("from_json - input=empty array, schema=struct, output=null") {
test("from_json - input=empty array, schema=struct, output=single row with null") {
val input = """[]"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
val output = null
val output = InternalRow(null)
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
}

Expand All @@ -487,7 +495,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
JsonToStructs(schema, Map.empty, Literal(badJson), gmtId),
null)
InternalRow(null))
}

test("from_json with timestamp") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {

val createParser = CreateJacksonParser.string _
val parsed = jsonDataset.rdd.mapPartitions { iter =>
val rawParser = new JacksonParser(actualSchema, parsedOptions)
val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true)
val parser = new FailureSafeParser[String](
input => rawParser.parse(input, createParser, UTF8String.fromString),
parsedOptions.parseMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
}

(file: PartitionedFile) => {
val parser = new JacksonParser(actualSchema, parsedOptions)
val parser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true)
JsonDataSource(parsedOptions).readFile(
broadcastedHadoopConf.value.value,
file,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,4 @@ select from_json(a, 'a INT') from t
-- !query 31 schema
struct<from_json(a):struct<a:int>>
-- !query 31 output
NULL
{"a":null}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package org.apache.spark.sql

import collection.JavaConverters._

import org.apache.spark.SparkException
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -132,7 +134,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {

checkAnswer(
df.select(from_json($"value", schema)),
Row(null) :: Nil)
Row(Row(null)) :: Nil)
}

test("from_json - json doesn't conform to the array type") {
Expand Down Expand Up @@ -547,4 +549,33 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
Map("pretty" -> "true"))),
Seq(Row(expected)))
}

test("from_json invalid json - check modes") {
withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
val schema = new StructType()
.add("a", IntegerType)
.add("b", IntegerType)
.add("_unparsed", StringType)
val badRec = """{"a" 1, "b": 11}"""
val df = Seq(badRec, """{"a": 2, "b": 12}""").toDS()

checkAnswer(
df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))),
Row(Row(null, null, badRec)) :: Row(Row(2, 12, null)) :: Nil)

val exception1 = intercept[SparkException] {
df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))).collect()
}.getMessage
assert(exception1.contains(
"Malformed records are detected in record parsing. Parse Mode: FAILFAST."))

val exception2 = intercept[SparkException] {
df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED")))
.collect()
}.getMessage
assert(exception2.contains(
"from_json() doesn't support the DROPMALFORMED mode. " +
"Acceptable modes are PERMISSIVE and FAILFAST."))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {

val dummyOption = new JSONOptions(Map.empty[String, String], "GMT")
val dummySchema = StructType(Seq.empty)
val parser = new JacksonParser(dummySchema, dummyOption)
val parser = new JacksonParser(dummySchema, dummyOption, allowArrayAsStructs = true)

Utils.tryWithResource(factory.createParser(writer.toString)) { jsonParser =>
jsonParser.nextToken()
Expand Down