Skip to content

[SPARK-26745][SQL] Skip empty lines in JSON-derived DataFrames when skipParsing optimization in effect #23665

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class FailureSafeParser[IN](
mode: ParseMode,
schema: StructType,
columnNameOfCorruptRecord: String,
isMultiLine: Boolean) {
isMultiLine: Boolean,
unparsedRecordIsNonEmpty: IN => Boolean = (_: IN) => true) {

private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord)
private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord))
Expand All @@ -55,11 +56,15 @@ class FailureSafeParser[IN](

def parse(input: IN): Iterator[InternalRow] = {
try {
if (skipParsing) {
Iterator.single(InternalRow.empty)
} else {
rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null))
}
if (skipParsing) {
if (unparsedRecordIsNonEmpty(input)) {
Copy link
Member

@HyukjinKwon HyukjinKwon Jan 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should rather revert #21909. I think #21909 was a bandaid fix and this is another bandaid fix for that.

JacksonParser itself can produce no record or multiple records. Previous code path assumed that it always produce a single record, and the current fix it checked the input again outside of JacksonParser.

There is another problem from #21909 . It also looks going to produce incorrect counts when the input json is an array:

$ cat tmp.json
[{"a": 1}, {"a": 2}]

Current master:

scala> spark.read.json("tmp.json").show()
+---+
|  a|
+---+
|  1|
|  2|
+---+


scala> spark.read.json("tmp.json").count()
res1: Long = 1

Spark 2.3.1:

scala> spark.read.json("tmp.json").show()
+---+
|  a|
+---+
|  1|
|  2|
+---+


scala> spark.read.json("tmp.json").count()
res1: Long = 2

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @cloud-fan and @gatorsmile, if you don't mind, I would like to revert #21909. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a good catch! I think the idea of count optimization still makes sense, but our parser is so flexible and there are many corner cases we need to think of.

cc @MaxGekk , how hard do you think it is to fix it? If it's too hard, +1 to revert it. I think a safer approach maybe, only enable this count optimization under some certain cases that are 100% safe. (whitelist).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, that's safer approach if possible but to do that, we should manually check the input like the current PR. It doesn't looks a good idea to me to check input outside of JacksonParser.

The problem is, we cannot distinguish the cases below without parsing:

[{...}, {...}]
[]
{...}
# empty string

One line (input: IN) can be, 0 record, 1 record and multiple records.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case when an user sets StructType for arrays, can be excluded from the count optimization in advance.

Regarding empty (blank) string, before #23543 they are considered as bad records (appear in results). And count() produced pretty consistent results.

As far as you know, in the case of count we have empty StructType as the required schema. It means we don't have to fully parse the input and convert all field to desired types. It means count() can "count" bad records. And we cannot compare number of rows in show() output to count(). There are always the case when the number of rows can be different. I think we should answer to more generic question - which input conform to empty StructType(). After that it should be clear what kind of optimization can be applied for count(). Possible answers:

  • readable string by Hadoop LineReader
  • any text with length > 0 ( we can do cheap filtering linesReader.filter(_.getLength > 0)
  • any valid UTF8 String (Hadoop's LineReader does not check that)
  • anything parsable by Jackson parser
  • Anything on which FailureSafeParser doesn't produce bad records (in PERMISSIVE mode).
  • String contains opened { and closed }. And anything in between.
  • Valid JSON record. Parsable by Jackson in our case.
  • Valid JSON + correct field values (can be converted to specified types).
  • something else

Till we answer to the above question, reverting of the #21909 just move us from one "bad" behavior to one another "bad" behavior.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[{...}, {...}] => 2
[] => 0
{...} => 1
# empty string => 0

I think the key here is, one line can produce 0 or 1 or more records, how to speed it up when we only care about counts? It looks to me that we can enable the count optimization only for {...}, and fallback to parsing for other cases. @MaxGekk do you think this is applicable? If it's a simple fix, let's do it for branch 2.4 as well, otherwise +1 for reverting it from 2.4 and re-do it at master.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like, how are we going to explain this to users?

Outside of datasources, count() has very well defined semantic - number of rows matched to required schema. In the case of count(), the required schema is Struct(Array()). From user's view, count() doesn't require any field presented in counted row.

If you would like to see the same number of rows in show() output and what count() returns, you need to push full datasource schema as required schema otherwise there are always malformed input on which you will see different results.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise there are always malformed input on which you will see different results

I think for permissive mode, the results(at least the counts) are always same even if some input are malformed? Otherwise, it seems like users only want to count the number of lines, and they should read the json files as text and do count.

Copy link
Member

@HyukjinKwon HyukjinKwon Jan 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: #23665 (comment)

This is reasoning about the count. (see below, continues)

I think for permissive mode, the results(at least the counts) are always same even if some input are malformed?

To be 100% about the correct results, we should always parse everything although we're doing the current way for optimization and it started to have some inconsistent results.

Yes, so we don't convert 100%. In that case, we should at least parse StructType() which I guess empty object {...}. I think that's what JSON did before the pointed PRs above.

Otherwise, it seems like users only want to count the number of lines, and they should read the json files as text and do count.

Yes, I agree. It shouldn't behaves like text source + count(). Let's revert anyway. I don't think this behaviour is ideal anyway.

For other behaviours, I was thinking about making a README.md that whitelists behaviours for both CSV and JSON for Spark developers under somewhere related JSON and CSV directory. It's a bit grunting job but sounds like it should be done. I could do this together @MaxGekk since he has worked on this area a lot as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks to me that we can enable the count optimization only for {...}, and fallback to parsing for other cases. @MaxGekk do you think this is applicable? If it's a simple fix, let's do it for branch 2.4 as well, otherwise +1 for reverting it from 2.4 and re-do it at master.

We can but we should add a if-else for each input + checking some characters since it targets to avoid to parse.

Iterator.single(InternalRow.empty)
} else {
Iterator.empty
}
} else {
rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to modify this file? Since this is a issue in json stuffs, I think its better to handle this case in the json parser side. Can't we do handle this in the same way with CSV one?, e.g.,

val filteredLines: Iterator[String] = CSVExprUtils.filterCommentAndEmpty(lines, options)

Copy link
Author

@sumitsu sumitsu Jan 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of my earlier revisions worked in the way you suggest (if I've understood your point correctly); I changed it in order to avoid redundant empty-line filtering in the case where the full parser has to run anyway (i.e. where skipParsing == false).

What do you think? Is that a valid optimization, or is it better to do it on the JSON side as in 13942b8 to avoid changes to FailureSafeParser?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this: maropu@f4df907
But, you should follow other guys who are familiar this part, @HyukjinKwon and @MaxGekk

} catch {
case e: BadRecordException => mode match {
case PermissiveMode =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
package org.apache.spark.sql.execution.datasources.json

import java.io.InputStream
import java.lang.Character.isWhitespace
import java.net.URI
import java.nio.ByteBuffer

import scala.collection.Iterator.continually

import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import com.google.common.io.ByteStreams
Expand Down Expand Up @@ -125,6 +129,18 @@ object TextInputJsonDataSource extends JsonDataSource {
.select("value").as(Encoders.STRING)
}

private def textLineHasNonWhitespace(rowText: Text): Boolean = {
val isAllWhitespace: Boolean =
rowText.getLength == 0 || {
val rowTextBuffer = ByteBuffer.wrap(rowText.getBytes)
continually(Text.bytesToCodePoint(rowTextBuffer))
.takeWhile(_ >= 0)
.take(rowText.getLength)
.forall(isWhitespace)
}
!isAllWhitespace
}

override def readFile(
conf: Configuration,
file: PartitionedFile,
Expand All @@ -141,7 +157,8 @@ object TextInputJsonDataSource extends JsonDataSource {
parser.options.parseMode,
schema,
parser.options.columnNameOfCorruptRecord,
parser.options.multiLine)
parser.options.multiLine,
textLineHasNonWhitespace)
linesReader.flatMap(safeParser.parse)
}

Expand Down
7 changes: 7 additions & 0 deletions sql/core/src/test/resources/test-data/with-empty-line.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{ "a" : 1 , "b" : 2 , "c" : 3 }

{ "a" : 4 , "b" : 5 , "c" : 6 }

{ "a" : 7 , "b" : 8 , "c" : 9 }


Original file line number Diff line number Diff line change
Expand Up @@ -2426,6 +2426,23 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
countForMalformedJSON(0, Seq(""))
}

test("count() for non-multiline input with empty lines") {
val withEmptyLineData = Array(Map("a" -> 1, "b" -> 2, "c" -> 3),
Map("a" -> 4, "b" -> 5, "c" -> 6),
Map("a" -> 7, "b" -> 8, "c" -> 9))
val df = spark.read.json("src/test/resources/test-data/with-empty-line.json")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz use testFile.

// important to do this .count() first, prior to caching/persisting/computing/collecting, to
// test the non-parsed-count pathway
assert(df.count() === withEmptyLineData.length,
"JSON DataFrame unparsed-count should exclude whitespace-only lines")
// cache and collect to check that count stays stable under those operations
df.cache()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we dont need this cache.

assert(df.count() === withEmptyLineData.length,
"JSON DataFrame parsed-count should exclude whitespace-only lines")
val collected = df.collect().map(_.getValuesMap(Seq("a", "b", "c")))
assert(collected === withEmptyLineData)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz check checkAnswer.

}

test("SPARK-25040: empty strings should be disallowed") {
def failedOnEmptyString(dataType: DataType): Unit = {
val df = spark.read.schema(s"a ${dataType.catalogString}")
Expand Down