Skip to content

Commit 84cb068

Browse files
committed
Revert count optimization in JSON datasource by SPARK-24959
1 parent 710d81e commit 84cb068

File tree

5 files changed

+17
-26
lines changed

5 files changed

+17
-26
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -450,8 +450,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
450450
input => rawParser.parse(input, createParser, UTF8String.fromString),
451451
parsedOptions.parseMode,
452452
schema,
453-
parsedOptions.columnNameOfCorruptRecord,
454-
parsedOptions.multiLine)
453+
parsedOptions.columnNameOfCorruptRecord)
455454
iter.flatMap(parser.parse)
456455
}
457456
sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming)
@@ -526,8 +525,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
526525
input => Seq(rawParser.parse(input)),
527526
parsedOptions.parseMode,
528527
schema,
529-
parsedOptions.columnNameOfCorruptRecord,
530-
parsedOptions.multiLine)
528+
parsedOptions.columnNameOfCorruptRecord)
531529
iter.flatMap(parser.parse)
532530
}
533531
sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = csvDataset.isStreaming)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FailureSafeParser.scala

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ class FailureSafeParser[IN](
2929
rawParser: IN => Seq[InternalRow],
3030
mode: ParseMode,
3131
schema: StructType,
32-
columnNameOfCorruptRecord: String,
33-
isMultiLine: Boolean) {
32+
columnNameOfCorruptRecord: String) {
3433

3534
private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord)
3635
private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord))
@@ -58,15 +57,9 @@ class FailureSafeParser[IN](
5857
}
5958
}
6059

61-
private val skipParsing = !isMultiLine && mode == PermissiveMode && schema.isEmpty
62-
6360
def parse(input: IN): Iterator[InternalRow] = {
6461
try {
65-
if (skipParsing) {
66-
Iterator.single(InternalRow.empty)
67-
} else {
68-
rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null))
69-
}
62+
rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null))
7063
} catch {
7164
case e: BadRecordException => mode match {
7265
case PermissiveMode =>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -203,11 +203,19 @@ class UnivocityParser(
203203
}
204204
}
205205

206+
private val doParse = if (requiredSchema.nonEmpty) {
207+
(input: String) => convert(tokenizer.parseLine(input))
208+
} else {
209+
// If `columnPruning` enabled and partition attributes scanned only,
210+
// `schema` gets empty.
211+
(_: String) => InternalRow.empty
212+
}
213+
206214
/**
207215
* Parses a single CSV string and turns it into either one resulting row or no row (if the
208216
* the record is malformed).
209217
*/
210-
def parse(input: String): InternalRow = convert(tokenizer.parseLine(input))
218+
def parse(input: String): InternalRow = doParse(input)
211219

212220
private val getToken = if (options.columnPruning) {
213221
(tokens: Array[String], index: Int) => tokens(index)
@@ -290,8 +298,7 @@ private[csv] object UnivocityParser {
290298
input => Seq(parser.convert(input)),
291299
parser.options.parseMode,
292300
schema,
293-
parser.options.columnNameOfCorruptRecord,
294-
parser.options.multiLine)
301+
parser.options.columnNameOfCorruptRecord)
295302
convertStream(inputStream, shouldDropHeader, tokenizer, checkHeader) { tokens =>
296303
safeParser.parse(tokens)
297304
}.flatten
@@ -339,8 +346,7 @@ private[csv] object UnivocityParser {
339346
input => Seq(parser.parse(input)),
340347
parser.options.parseMode,
341348
schema,
342-
parser.options.columnNameOfCorruptRecord,
343-
parser.options.multiLine)
349+
parser.options.columnNameOfCorruptRecord)
344350
filteredLines.flatMap(safeParser.parse)
345351
}
346352
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,7 @@ object TextInputJsonDataSource extends JsonDataSource {
139139
input => parser.parse(input, textParser, textToUTF8String),
140140
parser.options.parseMode,
141141
schema,
142-
parser.options.columnNameOfCorruptRecord,
143-
parser.options.multiLine)
142+
parser.options.columnNameOfCorruptRecord)
144143
linesReader.flatMap(safeParser.parse)
145144
}
146145

@@ -224,8 +223,7 @@ object MultiLineJsonDataSource extends JsonDataSource {
224223
input => parser.parse[InputStream](input, streamParser, partitionedFileString),
225224
parser.options.parseMode,
226225
schema,
227-
parser.options.columnNameOfCorruptRecord,
228-
parser.options.multiLine)
226+
parser.options.columnNameOfCorruptRecord)
229227

230228
safeParser.parse(
231229
CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))))

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonBenchmarks.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -194,9 +194,6 @@ object JSONBenchmarks {
194194
benchmark.addCase(s"Select 1 column + count()", 3) { _ =>
195195
ds.select($"col1").filter((_: Row) => true).count()
196196
}
197-
benchmark.addCase(s"count()", 3) { _ =>
198-
ds.count()
199-
}
200197

201198
/*
202199
Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz
@@ -205,7 +202,6 @@ object JSONBenchmarks {
205202
---------------------------------------------------------------------------------------------
206203
Select 10 columns + count() 9961 / 10006 1.0 996.1 1.0X
207204
Select 1 column + count() 8355 / 8470 1.2 835.5 1.2X
208-
count() 2104 / 2156 4.8 210.4 4.7X
209205
*/
210206
benchmark.run()
211207
}

0 commit comments

Comments
 (0)