-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-25243][SQL] Use FailureSafeParser in from_json #22237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
c2e7078
fe2baa4
cecc8f5
75bdb03
7a8804d
01b63f1
b1894d2
b76b8d3
104ee44
a87785a
c3091b3
55be20b
ce49b24
57eb59f
20b7522
63b8b66
a5489f5
9904903
bda3a4e
fa20fd2
2663696
b84b343
e22b974
4157141
54be09c
3f04f7f
b2988c7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -554,54 +554,44 @@ case class JsonToStructs( | |||
@transient | ||||
lazy val converter = nullableSchema match { | ||||
case _: StructType => | ||||
(rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null | ||||
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so we may still return null in some cases, can you list them? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We shouldn't return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually I am wrong we return empty iterator and as the consequence of that null in the case if there is no input tokens, there spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala Line 398 in a8a1ac0
We can throw There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we don't have to do it in this PR, but it would be great to document when this expression will return null, in the class doc of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ... and with some tests to verify it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We already state in the docs for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For now the case is more concrete, we return null if |
||||
case _: ArrayType => | ||||
(rows: Seq[InternalRow]) => rows.head.getArray(0) | ||||
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getArray(0) else null | ||||
case _: MapType => | ||||
(rows: Seq[InternalRow]) => rows.head.getMap(0) | ||||
(rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) else null | ||||
} | ||||
|
||||
@transient | ||||
lazy val parser = | ||||
new JacksonParser( | ||||
nullableSchema, | ||||
new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get)) | ||||
val nameOfCorruptRecord = SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD) | ||||
@transient lazy val parser = { | ||||
val parsedOptions = new JSONOptions(options, timeZoneId.get, nameOfCorruptRecord) | ||||
val mode = parsedOptions.parseMode | ||||
if (mode != PermissiveMode && mode != FailFastMode) { | ||||
This comment was marked as resolved.
Sorry, something went wrong. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean by introducing new val There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea, instead of the "mode" option. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah i see. If the |
||||
throw new IllegalArgumentException(s"from_json() doesn't support the ${mode.name} mode. " + | ||||
s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}.") | ||||
} | ||||
val rawParser = new JacksonParser(nullableSchema, parsedOptions, allowArrayAsStructs = false) | ||||
val createParser = CreateJacksonParser.utf8String _ | ||||
|
||||
val parserSchema = nullableSchema match { | ||||
case s: StructType => s | ||||
case other => StructType(StructField("value", other) :: Nil) | ||||
} | ||||
|
||||
new FailureSafeParser[UTF8String]( | ||||
input => rawParser.parse(input, createParser, identity[UTF8String]), | ||||
mode, | ||||
parserSchema, | ||||
parsedOptions.columnNameOfCorruptRecord, | ||||
parsedOptions.multiLine) | ||||
} | ||||
|
||||
override def dataType: DataType = nullableSchema | ||||
|
||||
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = | ||||
copy(timeZoneId = Option(timeZoneId)) | ||||
|
||||
override def nullSafeEval(json: Any): Any = { | ||||
// When input is, | ||||
// - `null`: `null`. | ||||
// - invalid json: `null`. | ||||
// - empty string: `null`. | ||||
// | ||||
// When the schema is array, | ||||
// - json array: `Array(Row(...), ...)` | ||||
// - json object: `Array(Row(...))` | ||||
// - empty json array: `Array()`. | ||||
// - empty json object: `Array(Row(null))`. | ||||
// | ||||
// When the schema is a struct, | ||||
// - json object/array with single element: `Row(...)` | ||||
// - json array with multiple elements: `null` | ||||
// - empty json array: `null`. | ||||
// - empty json object: `Row(null)`. | ||||
|
||||
// We need `null` if the input string is an empty string. `JacksonParser` can | ||||
// deal with this but produces `Nil`. | ||||
if (json.toString.trim.isEmpty) return null | ||||
|
||||
try { | ||||
converter(parser.parse( | ||||
json.asInstanceOf[UTF8String], | ||||
CreateJacksonParser.utf8String, | ||||
identity[UTF8String])) | ||||
} catch { | ||||
case _: BadRecordException => null | ||||
} | ||||
converter(parser.parse(json.asInstanceOf[UTF8String])) | ||||
} | ||||
|
||||
override def inputTypes: Seq[AbstractDataType] = StringType :: Nil | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason we made this change?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean this particular line or in general?
This line was changed because in the
PERMISSIVE
mode we usually return aRow
with null fields that we wasn't able to parse instead of justnull
for whole row.In general, to support the
PERMISSIVE
andFAILFAST
modes as for JSON datasource. Before the changesfrom_json
didn't support any modes and thecolumnNameOfCorruptRecord
option in particular.