Skip to content

[SPARK-25243][SQL] Use FailureSafeParser in from_json #22237

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 27 commits into from

Conversation

MaxGekk
Copy link
Member

@MaxGekk MaxGekk commented Aug 26, 2018

What changes were proposed in this pull request?

In the PR, I propose to switch from_json on FailureSafeParser, and to make the function compatible to PERMISSIVE mode by default, and to support the FAILFAST mode as well. The DROPMALFORMED mode is not supported by from_json.

How was this patch tested?

It was tested by existing JsonSuite/CSVSuite, JsonFunctionsSuite and JsonExpressionsSuite as well as new tests for from_json which checks different modes.

@SparkQA
Copy link

SparkQA commented Aug 26, 2018

Test build #95266 has finished for PR 22237 at commit 0452a2f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Aug 26, 2018

jenkins, retest this, please

@SparkQA
Copy link

SparkQA commented Aug 26, 2018

Test build #95267 has finished for PR 22237 at commit 0452a2f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@transient lazy val createParser = CreateJacksonParser.utf8String _
@transient lazy val parser = new FailureSafeParser[UTF8String](
input => rawParser.parse(input, createParser, identity[UTF8String]),
parsedOptions.parseMode,
Copy link
Member

@viirya viirya Aug 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep using previous default mode FailFastMode? Now default mode becomes PermissiveMode.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous settings of FailFastMode didn't impact on the behavior because the mode option wasn't handled at all.

Copy link
Member

@viirya viirya Aug 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not handled by JacksonParser, and the behavior in here is somehow similar to PermissiveMode as @HyukjinKwon pointed out at https://github.com/apache/spark/pull/22237/files#r212850156, but not exactly the same.

Seems now the PermissiveMode on FailureSafeParser has different result on corrupted records. I noticed that some existing tests maybe changed due to that.


checkAnswer(
df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED"))),
Row(null) :: Row(Row(2)) :: Nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does it work for DROPMALFORMED mode? This doesn't actually drop the record like JSON datasource.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DROPMALFORMED mode returns null for malformed JSON lines. User can filter them out later. @HyukjinKwon Do you know how to drop rows in UnaryExpressions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, only possibility I raised was to make it generator expression. I haven't proposed a parse mode for this reason so far.

Row(Row(null)) :: Row(Row(2)) :: Nil)

val exceptionOne = intercept[SparkException] {
df.select(from_json($"value", schema, Map("mode" -> "FAILFAST"))).collect()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JsonToStructs resembles PERMISSIVE mode (from the first place) although their behaviours are slightly different. This is going to be different with PERMISSIVE and also FAILFAST modes. They are actually behaviour changes if we just use PERMISSIVE mode here by default (as @viirya pointed out).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Behavior of JsonToStructs is pretty close to PERMISSIVE actually. I have to make just a few small changes in tests that checks processing malformed inputs.

@SparkQA
Copy link

SparkQA commented Aug 27, 2018

Test build #95269 has finished for PR 22237 at commit 47dbe6b.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

nullableSchema,
new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get))
@transient lazy val parsedOptions = new JSONOptions(options, timeZoneId.get)
@transient lazy val rawParser = new JacksonParser(nullableSchema, parsedOptions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this?


  @transient lazy val parser = {
    val parsedOptions = new JSONOptions(options, timeZoneId.get)
    val rawParser = new JacksonParser(nullableSchema, parsedOptions)
    val createParser = CreateJacksonParser.utf8String _
    new FailureSafeParser[UTF8String](
      input => rawParser.parse(input, createParser, identity[UTF8String]),
      parsedOptions.parseMode,
      schema,
      parsedOptions.columnNameOfCorruptRecord,
      parsedOptions.multiLine)
  }

val actualSchema = StructType(struct.filterNot(_.name == columnNameOfCorruptRecord))
val resultRow = new GenericInternalRow(struct.length)
val nullResult = new GenericInternalRow(struct.length)
if (corruptFieldIndex.isDefined) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move actualSchema and resultRow into if (corruptFieldIndex.isDefined) { inside?

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Aug 27, 2018

I think one thing we could do this for now is, only to support both FAILFAST and PERMISSIVE mode and throws an exception otherwise, to match the current behaviour to PERMISSIVE mode, explain that in the migration guide.

@MaxGekk
Copy link
Member Author

MaxGekk commented Aug 28, 2018

to match the current behaviour to PERMISSIVE mode, explain that in the migration guide.

@HyukjinKwon Should I target to Spark 3.0 or 2.4?

@gatorsmile
Copy link
Member

If we can finish it before the code freeze, it will be 2.4; otherwise it is 3.0

@SparkQA
Copy link

SparkQA commented Aug 28, 2018

Test build #95378 has finished for PR 22237 at commit c81e9ae.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 30, 2018

Test build #95436 has finished for PR 22237 at commit b172e8e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@transient lazy val parser = {
val parsedOptions = new JSONOptions(options, timeZoneId.get)
val mode = parsedOptions.parseMode
require(mode == PermissiveMode || mode == FailFastMode,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move this verification into the constructor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can we use AnalysisException instead of require?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't put require to the constructor body directly because of timeZoneId. If I move the checking up, I need to move val parsedOptions = new JSONOptions(options, timeZoneId.get) too (lazy or not lazy). Checking will force getting of timeZoneId.get which will raise an exception. I will check this today or tomorrow.

Copy link
Member

@maropu maropu Aug 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks!

import org.apache.spark.unsafe.types.UTF8String

class FailureSafeParser[IN](
rawParser: IN => Seq[InternalRow],
mode: ParseMode,
schema: StructType,
schema: DataType,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schema -> dataType?

@SparkQA
Copy link

SparkQA commented Aug 30, 2018

Test build #95464 has finished for PR 22237 at commit fee61dc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

"Malformed records are detected in record parsing. Parse Mode: FAILFAST."))

val exception2 = intercept[AnalysisException] {
df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED"))).collect()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you fix the code to throw an analysis exception in analysis phases instead of execution phases (.collect() called)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced it by AnalysisException but I think it is wrong decision. Throwing of AnalysisException at run-time looks ugly:

Caused by: org.apache.spark.sql.AnalysisException: from_json() doesn't support the DROPMALFORMED mode. Acceptable modes are PERMISSIVE and FAILFAST.;
	at org.apache.spark.sql.catalyst.expressions.JsonToStructs.parser$lzycompute(jsonExpressions.scala:568)
	at org.apache.spark.sql.catalyst.expressions.JsonToStructs.parser(jsonExpressions.scala:564)
...
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

I am going to replace it by something else or revert back to IllegalArgumentException.

@@ -1897,6 +1897,7 @@ working with timestamps in `pandas_udf`s to get the best performance, see
- In version 2.3 and earlier, CSV rows are considered as malformed if at least one column value in the row is malformed. CSV parser dropped such rows in the DROPMALFORMED mode or outputs an error in the FAILFAST mode. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. As an example, CSV file contains the "id,name" header and one row "1234". In Spark 2.4, selection of the id column consists of a row with one column value 1234 but in Spark 2.3 and earlier it is empty in the DROPMALFORMED mode. To restore the previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`.
- Since Spark 2.4, File listing for compute statistics is done in parallel by default. This can be disabled by setting `spark.sql.parallelFileListingInStatsComputation.enabled` to `False`.
- Since Spark 2.4, Metadata files (e.g. Parquet summary files) and temporary files are not counted as data files when calculating table size during Statistics computation.
- Since Spark 2.4, the from_json functions supports two modes - PERMISSIVE and FAILFAST. The modes can be set via the `mode` option. The default mode became PERMISSIVE. In previous versions, behavior of from_json did not conform to either PERMISSIVE nor FAILFAST, especially in processing of malformed JSON records.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: from_json -> `from_json`.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Aug 31, 2018

I agree with the current approach but wanna make sure if we want this in 2.4.0 or 3.0.0 since there's no way to keep the previous behaviour and code freeze is super close. I actually prefer to go ahead in 3.0.0.

@gatorsmile and @cloud-fan, WDYT? I think this will likely break existing user apps.

@SparkQA
Copy link

SparkQA commented Aug 31, 2018

Test build #95532 has finished for PR 22237 at commit 1cf4213.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 31, 2018

Test build #95541 has finished for PR 22237 at commit 9ad834f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 6, 2018

Test build #95760 has finished for PR 22237 at commit a433388.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Sep 6, 2018

@HyukjinKwon I re-targeted the changes for Spark 3.0. Please, take a look at it one more time.

@HyukjinKwon
Copy link
Member

retest this please

@HyukjinKwon
Copy link
Member

Will take a look soon.

@MaxGekk
Copy link
Member Author

MaxGekk commented Sep 10, 2018

Will take a look soon.

@HyukjinKwon Thank you. Waiting for your feedback.

@SparkQA
Copy link

SparkQA commented Sep 10, 2018

Test build #95867 has finished for PR 22237 at commit a433388.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 11, 2018

Test build #95896 has finished for PR 22237 at commit 26287a5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Sep 13, 2018

@HyukjinKwon Please, take a look at it again.

@HyukjinKwon HyukjinKwon force-pushed the from_json-failuresafe branch from d91f34f to b2988c7 Compare October 24, 2018 03:42
@HyukjinKwon
Copy link
Member

https://github.com/apache/spark/pull/22237/files#r223707899 makes sense to me. Addressed. LGTM from my side as well

@cloud-fan
Copy link
Contributor

LGTM, pending jenkins.

@SparkQA
Copy link

SparkQA commented Oct 24, 2018

Test build #97958 has finished for PR 22237 at commit b2988c7.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Oct 24, 2018

Test build #97966 has finished for PR 22237 at commit b2988c7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 4d6704d Oct 24, 2018
@HyukjinKwon
Copy link
Member

Thanks all!!

@MaxGekk
Copy link
Member Author

MaxGekk commented Oct 31, 2018

@HyukjinKwon Thank you for following up work on the PR. @cloud-fan @viirya @maropu Thanks for your reviews.

@@ -1694,7 +1694,7 @@ test_that("column functions", {
df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}")))
schema2 <- structType(structField("date", "date"))
s <- collect(select(df, from_json(df$col, schema2)))
expect_equal(s[[1]][[1]], NA)
expect_equal(s[[1]][[1]]$date, NA)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason we made this change?

Copy link
Member Author

@MaxGekk MaxGekk Nov 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean this particular line or in general?

This line was changed because in the PERMISSIVE mode we usually return a Row with null fields that we wasn't able to parse instead of just null for whole row.

In general, to support the PERMISSIVE and FAILFAST modes as for JSON datasource. Before the changes from_json didn't support any modes and the columnNameOfCorruptRecord option in particular.

jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

In the PR, I propose to switch `from_json` on `FailureSafeParser`, and to make the function compatible to `PERMISSIVE` mode by default, and to support the `FAILFAST` mode as well. The `DROPMALFORMED` mode is not supported by `from_json`.

## How was this patch tested?

It was tested by existing `JsonSuite`/`CSVSuite`, `JsonFunctionsSuite` and `JsonExpressionsSuite` as well as new tests for `from_json` which checks different modes.

Closes apache#22237 from MaxGekk/from_json-failuresafe.

Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@MaxGekk MaxGekk deleted the from_json-failuresafe branch August 17, 2019 13:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants