-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-25243][SQL] Use FailureSafeParser in from_json #22237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #95266 has finished for PR 22237 at commit
|
jenkins, retest this, please |
Test build #95267 has finished for PR 22237 at commit
|
@transient lazy val createParser = CreateJacksonParser.utf8String _ | ||
@transient lazy val parser = new FailureSafeParser[UTF8String]( | ||
input => rawParser.parse(input, createParser, identity[UTF8String]), | ||
parsedOptions.parseMode, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should keep using previous default mode FailFastMode
? Now default mode becomes PermissiveMode
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previous settings of FailFastMode
didn't impact on the behavior because the mode
option wasn't handled at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not handled by JacksonParser
, and the behavior in here is somehow similar to PermissiveMode
as @HyukjinKwon pointed out at https://github.com/apache/spark/pull/22237/files#r212850156, but not exactly the same.
Seems now the PermissiveMode
on FailureSafeParser
has different result on corrupted records. I noticed that some existing tests maybe changed due to that.
|
||
checkAnswer( | ||
df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED"))), | ||
Row(null) :: Row(Row(2)) :: Nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does it work for DROPMALFORMED mode? This doesn't actually drop the record like JSON datasource.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DROPMALFORMED
mode returns null
for malformed JSON lines. User can filter them out later. @HyukjinKwon Do you know how to drop rows in UnaryExpression
s?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, only possibility I raised was to make it generator expression. I haven't proposed a parse mode for this reason so far.
Row(Row(null)) :: Row(Row(2)) :: Nil) | ||
|
||
val exceptionOne = intercept[SparkException] { | ||
df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))).collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JsonToStructs
resembles PERMISSIVE mode (from the first place) although their behaviours are slightly different. This is going to be different with PERMISSIVE and also FAILFAST modes. They are actually behaviour changes if we just use PERMISSIVE mode here by default (as @viirya pointed out).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Behavior of JsonToStructs
is pretty close to PERMISSIVE
actually. I have to make just a few small changes in tests that checks processing malformed inputs.
Test build #95269 has finished for PR 22237 at commit
|
nullableSchema, | ||
new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get)) | ||
@transient lazy val parsedOptions = new JSONOptions(options, timeZoneId.get) | ||
@transient lazy val rawParser = new JacksonParser(nullableSchema, parsedOptions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this?
@transient lazy val parser = {
val parsedOptions = new JSONOptions(options, timeZoneId.get)
val rawParser = new JacksonParser(nullableSchema, parsedOptions)
val createParser = CreateJacksonParser.utf8String _
new FailureSafeParser[UTF8String](
input => rawParser.parse(input, createParser, identity[UTF8String]),
parsedOptions.parseMode,
schema,
parsedOptions.columnNameOfCorruptRecord,
parsedOptions.multiLine)
}
val actualSchema = StructType(struct.filterNot(_.name == columnNameOfCorruptRecord)) | ||
val resultRow = new GenericInternalRow(struct.length) | ||
val nullResult = new GenericInternalRow(struct.length) | ||
if (corruptFieldIndex.isDefined) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move actualSchema
and resultRow
into if (corruptFieldIndex.isDefined) {
inside?
I think one thing we could do this for now is, only to support both FAILFAST and PERMISSIVE mode and throws an exception otherwise, to match the current behaviour to PERMISSIVE mode, explain that in the migration guide. |
@HyukjinKwon Should I target to Spark 3.0 or 2.4? |
If we can finish it before the code freeze, it will be 2.4; otherwise it is 3.0 |
Test build #95378 has finished for PR 22237 at commit
|
Test build #95436 has finished for PR 22237 at commit
|
@transient lazy val parser = { | ||
val parsedOptions = new JSONOptions(options, timeZoneId.get) | ||
val mode = parsedOptions.parseMode | ||
require(mode == PermissiveMode || mode == FailFastMode, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should move this verification into the constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, can we use AnalysisException
instead of require
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't put require
to the constructor body directly because of timeZoneId
. If I move the checking up, I need to move val parsedOptions = new JSONOptions(options, timeZoneId.get)
too (lazy or not lazy). Checking will force getting of timeZoneId.get
which will raise an exception. I will check this today or tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, thanks!
import org.apache.spark.unsafe.types.UTF8String | ||
|
||
class FailureSafeParser[IN]( | ||
rawParser: IN => Seq[InternalRow], | ||
mode: ParseMode, | ||
schema: StructType, | ||
schema: DataType, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
schema
-> dataType
?
Test build #95464 has finished for PR 22237 at commit
|
"Malformed records are detected in record parsing. Parse Mode: FAILFAST.")) | ||
|
||
val exception2 = intercept[AnalysisException] { | ||
df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED"))).collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you fix the code to throw an analysis exception in analysis phases instead of execution phases (.collect()
called)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I replaced it by AnalysisException
but I think it is wrong decision. Throwing of AnalysisException
at run-time looks ugly:
Caused by: org.apache.spark.sql.AnalysisException: from_json() doesn't support the DROPMALFORMED mode. Acceptable modes are PERMISSIVE and FAILFAST.;
at org.apache.spark.sql.catalyst.expressions.JsonToStructs.parser$lzycompute(jsonExpressions.scala:568)
at org.apache.spark.sql.catalyst.expressions.JsonToStructs.parser(jsonExpressions.scala:564)
...
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
I am going to replace it by something else or revert back to IllegalArgumentException
.
docs/sql-programming-guide.md
Outdated
@@ -1897,6 +1897,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see | |||
- In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`. | |||
- Since Spark 2.4, File listing for compute statistics is done in parallel by default. This can be disabled by setting `spark.sql.parallelFileListingInStatsComputation.enabled` to `False`. | |||
- Since Spark 2.4, Metadata files (e.g. Parquet summary files) and temporary files are not counted as data files when calculating table size during Statistics computation. | |||
- Since Spark 2.4, the from_json functions supports two modes - PERMISSIVE and FAILFAST. The modes can be set via the `mode` option. The default mode became PERMISSIVE. In previous versions, behavior of from_json did not conform to either PERMISSIVE nor FAILFAST, especially in processing of malformed JSON records. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: from_json -> `from_json`
.
I agree with the current approach but wanna make sure if we want this in 2.4.0 or 3.0.0 since there's no way to keep the previous behaviour and code freeze is super close. I actually prefer to go ahead in 3.0.0. @gatorsmile and @cloud-fan, WDYT? I think this will likely break existing user apps. |
Test build #95532 has finished for PR 22237 at commit
|
Test build #95541 has finished for PR 22237 at commit
|
Test build #95760 has finished for PR 22237 at commit
|
@HyukjinKwon I re-targeted the changes for Spark 3.0. Please, take a look at it one more time. |
retest this please |
Will take a look soon. |
@HyukjinKwon Thank you. Waiting for your feedback. |
Test build #95867 has finished for PR 22237 at commit
|
Test build #95896 has finished for PR 22237 at commit
|
@HyukjinKwon Please, take a look at it again. |
…has more than 1 element for struct schema
d91f34f
to
b2988c7
Compare
https://github.com/apache/spark/pull/22237/files#r223707899 makes sense to me. Addressed. LGTM from my side as well |
LGTM, pending jenkins. |
Test build #97958 has finished for PR 22237 at commit
|
retest this please |
Test build #97966 has finished for PR 22237 at commit
|
thanks, merging to master! |
Thanks all!! |
@HyukjinKwon Thank you for following up work on the PR. @cloud-fan @viirya @maropu Thanks for your reviews. |
@@ -1694,7 +1694,7 @@ test_that("column functions", { | |||
df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}"))) | |||
schema2 <- structType(structField("date", "date")) | |||
s <- collect(select(df, from_json(df$col, schema2))) | |||
expect_equal(s[[1]][[1]], NA) | |||
expect_equal(s[[1]][[1]]$date, NA) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason we made this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean this particular line or in general?
This line was changed because in the PERMISSIVE
mode we usually return a Row
with null fields that we wasn't able to parse instead of just null
for whole row.
In general, to support the PERMISSIVE
and FAILFAST
modes as for JSON datasource. Before the changes from_json
didn't support any modes and the columnNameOfCorruptRecord
option in particular.
## What changes were proposed in this pull request? In the PR, I propose to switch `from_json` on `FailureSafeParser`, and to make the function compatible to `PERMISSIVE` mode by default, and to support the `FAILFAST` mode as well. The `DROPMALFORMED` mode is not supported by `from_json`. ## How was this patch tested? It was tested by existing `JsonSuite`/`CSVSuite`, `JsonFunctionsSuite` and `JsonExpressionsSuite` as well as new tests for `from_json` which checks different modes. Closes apache#22237 from MaxGekk/from_json-failuresafe. Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com> Co-authored-by: hyukjinkwon <gurwls223@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
In the PR, I propose to switch
from_json
onFailureSafeParser
, and to make the function compatible toPERMISSIVE
mode by default, and to support theFAILFAST
mode as well. TheDROPMALFORMED
mode is not supported byfrom_json
.How was this patch tested?
It was tested by existing
JsonSuite
/CSVSuite
,JsonFunctionsSuite
andJsonExpressionsSuite
as well as new tests forfrom_json
which checks different modes.