Skip to content

Commit 21fde57

Browse files
Nathan Howellcloud-fan
Nathan Howell
authored andcommitted
[SPARK-18352][SQL] Support parsing multiline json files
## What changes were proposed in this pull request? If a new option `wholeFile` is set to `true` the JSON reader will parse each file (instead of a single line) as a value. This is done with Jackson streaming and it should be capable of parsing very large documents, assuming the row will fit in memory. Because the file is not buffered in memory the corrupt record handling is also slightly different when `wholeFile` is enabled: the corrupt column will contain the filename instead of the literal JSON if there is a parsing failure. It would be easy to extend this to add the parser location (line, column and byte offsets) to the output if desired. These changes have allowed types other than `String` to be parsed. Support for `UTF8String` and `Text` have been added (alongside `String` and `InputFormat`) and no longer require a conversion to `String` just for parsing. I've also included a few other changes that generate slightly better bytecode and (imo) make it more obvious when and where boxing is occurring in the parser. These are included as separate commits, let me know if they should be flattened into this PR or moved to a new one. ## How was this patch tested? New and existing unit tests. No performance or load tests have been run. Author: Nathan Howell <nhowell@godaddy.com> Closes apache#16386 from NathanHowell/SPARK-18352.
1 parent dcc2d54 commit 21fde57

File tree

17 files changed

+740
-231
lines changed

17 files changed

+740
-231
lines changed

common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,13 @@ public void writeTo(ByteBuffer buffer) {
147147
buffer.position(pos + numBytes);
148148
}
149149

150-
public void writeTo(OutputStream out) throws IOException {
150+
/**
151+
* Returns a {@link ByteBuffer} wrapping the base object if it is a byte array
152+
* or a copy of the data if the base object is not a byte array.
153+
*
154+
* Unlike getBytes this will not create a copy the array if this is a slice.
155+
*/
156+
public @Nonnull ByteBuffer getByteBuffer() {
151157
if (base instanceof byte[] && offset >= BYTE_ARRAY_OFFSET) {
152158
final byte[] bytes = (byte[]) base;
153159

@@ -160,12 +166,20 @@ public void writeTo(OutputStream out) throws IOException {
160166
throw new ArrayIndexOutOfBoundsException();
161167
}
162168

163-
out.write(bytes, (int) arrayOffset, numBytes);
169+
return ByteBuffer.wrap(bytes, (int) arrayOffset, numBytes);
164170
} else {
165-
out.write(getBytes());
171+
return ByteBuffer.wrap(getBytes());
166172
}
167173
}
168174

175+
public void writeTo(OutputStream out) throws IOException {
176+
final ByteBuffer bb = this.getByteBuffer();
177+
assert(bb.hasArray());
178+
179+
// similar to Utils.writeByteBuffer but without the spark-core dependency
180+
out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining());
181+
}
182+
169183
/**
170184
* Returns the number of bytes for a code point with the first byte as `b`
171185
* @param b The first byte of a code point

core/src/main/scala/org/apache/spark/input/PortableDataStream.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat, CombineFil
2929

3030
import org.apache.spark.internal.config
3131
import org.apache.spark.SparkContext
32+
import org.apache.spark.annotation.Since
3233

3334
/**
3435
* A general format for reading whole files in as streams, byte arrays,
@@ -175,6 +176,7 @@ class PortableDataStream(
175176
* Create a new DataInputStream from the split and context. The user of this method is responsible
176177
* for closing the stream after usage.
177178
*/
179+
@Since("1.2.0")
178180
def open(): DataInputStream = {
179181
val pathp = split.getPath(index)
180182
val fs = pathp.getFileSystem(conf)
@@ -184,6 +186,7 @@ class PortableDataStream(
184186
/**
185187
* Read the file as a byte array
186188
*/
189+
@Since("1.2.0")
187190
def toArray(): Array[Byte] = {
188191
val stream = open()
189192
try {
@@ -193,6 +196,10 @@ class PortableDataStream(
193196
}
194197
}
195198

199+
@Since("1.2.0")
196200
def getPath(): String = path
201+
202+
@Since("2.2.0")
203+
def getConfiguration: Configuration = conf
197204
}
198205

python/pyspark/sql/readwriter.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,12 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
159159
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
160160
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
161161
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
162-
timeZone=None):
162+
timeZone=None, wholeFile=None):
163163
"""
164-
Loads a JSON file (`JSON Lines text format or newline-delimited JSON
165-
<http://jsonlines.org/>`_) or an RDD of Strings storing JSON objects (one object per
166-
record) and returns the result as a :class`DataFrame`.
164+
Loads a JSON file and returns the results as a :class:`DataFrame`.
165+
166+
Both JSON (one record per file) and `JSON Lines <http://jsonlines.org/>`_
167+
(newline-delimited JSON) are supported and can be selected with the `wholeFile` parameter.
167168
168169
If the ``schema`` parameter is not specified, this function goes
169170
through the input once to determine the input schema.
@@ -212,6 +213,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
212213
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
213214
:param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
214215
If None is set, it uses the default value, session local timezone.
216+
:param wholeFile: parse one record, which may span multiple lines, per file. If None is
217+
set, it uses the default value, ``false``.
215218
216219
>>> df1 = spark.read.json('python/test_support/sql/people.json')
217220
>>> df1.dtypes
@@ -228,7 +231,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
228231
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
229232
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
230233
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
231-
timestampFormat=timestampFormat, timeZone=timeZone)
234+
timestampFormat=timestampFormat, timeZone=timeZone, wholeFile=wholeFile)
232235
if isinstance(path, basestring):
233236
path = [path]
234237
if type(path) == list:

python/pyspark/sql/streaming.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -428,11 +428,13 @@ def load(self, path=None, format=None, schema=None, **options):
428428
def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
429429
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
430430
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
431-
mode=None, columnNameOfCorruptRecord=None, dateFormat=None,
432-
timestampFormat=None, timeZone=None):
431+
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
432+
timeZone=None, wholeFile=None):
433433
"""
434-
Loads a JSON file stream (`JSON Lines text format or newline-delimited JSON
435-
<http://jsonlines.org/>`_) and returns a :class`DataFrame`.
434+
Loads a JSON file stream and returns the results as a :class:`DataFrame`.
435+
436+
Both JSON (one record per file) and `JSON Lines <http://jsonlines.org/>`_
437+
(newline-delimited JSON) are supported and can be selected with the `wholeFile` parameter.
436438
437439
If the ``schema`` parameter is not specified, this function goes
438440
through the input once to determine the input schema.
@@ -483,6 +485,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
483485
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSZZ``.
484486
:param timeZone: sets the string that indicates a timezone to be used to parse timestamps.
485487
If None is set, it uses the default value, session local timezone.
488+
:param wholeFile: parse one record, which may span multiple lines, per file. If None is
489+
set, it uses the default value, ``false``.
486490
487491
>>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
488492
>>> json_sdf.isStreaming
@@ -496,7 +500,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
496500
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
497501
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
498502
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
499-
timestampFormat=timestampFormat, timeZone=timeZone)
503+
timestampFormat=timestampFormat, timeZone=timeZone, wholeFile=wholeFile)
500504
if isinstance(path, basestring):
501505
return self._df(self._jreader.json(path))
502506
else:

python/pyspark/sql/tests.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,13 @@ def test_udf_with_order_by_and_limit(self):
439439
res.explain(True)
440440
self.assertEqual(res.collect(), [Row(id=0, copy=0)])
441441

442+
def test_wholefile_json(self):
443+
from pyspark.sql.types import StringType
444+
people1 = self.spark.read.json("python/test_support/sql/people.json")
445+
people_array = self.spark.read.json("python/test_support/sql/people_array.json",
446+
wholeFile=True)
447+
self.assertEqual(people1.collect(), people_array.collect())
448+
442449
def test_udf_with_input_file_name(self):
443450
from pyspark.sql.functions import udf, input_file_name
444451
from pyspark.sql.types import StringType
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[
2+
{
3+
"name": "Michael"
4+
},
5+
{
6+
"name": "Andy",
7+
"age": 30
8+
},
9+
{
10+
"name": "Justin",
11+
"age": 19
12+
}
13+
]

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -497,16 +497,20 @@ case class JsonToStruct(
497497
lazy val parser =
498498
new JacksonParser(
499499
schema,
500-
"invalid", // Not used since we force fail fast. Invalid rows will be set to `null`.
501-
new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get))
500+
new JSONOptions(options + ("mode" -> ParseModes.FAIL_FAST_MODE), timeZoneId.get))
502501

503502
override def dataType: DataType = schema
504503

505504
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
506505
copy(timeZoneId = Option(timeZoneId))
507506

508507
override def nullSafeEval(json: Any): Any = {
509-
try parser.parse(json.toString).headOption.orNull catch {
508+
try {
509+
parser.parse(
510+
json.asInstanceOf[UTF8String],
511+
CreateJacksonParser.utf8String,
512+
identity[UTF8String]).headOption.orNull
513+
} catch {
510514
case _: SparkSQLJsonProcessingException => null
511515
}
512516
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.json
19+
20+
import java.io.InputStream
21+
22+
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
23+
import org.apache.hadoop.io.Text
24+
25+
import org.apache.spark.unsafe.types.UTF8String
26+
27+
private[sql] object CreateJacksonParser extends Serializable {
28+
def string(jsonFactory: JsonFactory, record: String): JsonParser = {
29+
jsonFactory.createParser(record)
30+
}
31+
32+
def utf8String(jsonFactory: JsonFactory, record: UTF8String): JsonParser = {
33+
val bb = record.getByteBuffer
34+
assert(bb.hasArray)
35+
36+
jsonFactory.createParser(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())
37+
}
38+
39+
def text(jsonFactory: JsonFactory, record: Text): JsonParser = {
40+
jsonFactory.createParser(record.getBytes, 0, record.getLength)
41+
}
42+
43+
def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = {
44+
jsonFactory.createParser(record)
45+
}
46+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,20 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs
3131
* Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
3232
*/
3333
private[sql] class JSONOptions(
34-
@transient private val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String)
34+
@transient private val parameters: CaseInsensitiveMap[String],
35+
defaultTimeZoneId: String,
36+
defaultColumnNameOfCorruptRecord: String)
3537
extends Logging with Serializable {
3638

37-
def this(parameters: Map[String, String], defaultTimeZoneId: String) =
38-
this(CaseInsensitiveMap(parameters), defaultTimeZoneId)
39+
def this(
40+
parameters: Map[String, String],
41+
defaultTimeZoneId: String,
42+
defaultColumnNameOfCorruptRecord: String = "") = {
43+
this(
44+
CaseInsensitiveMap(parameters),
45+
defaultTimeZoneId,
46+
defaultColumnNameOfCorruptRecord)
47+
}
3948

4049
val samplingRatio =
4150
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
@@ -57,7 +66,8 @@ private[sql] class JSONOptions(
5766
parameters.get("allowBackslashEscapingAnyCharacter").map(_.toBoolean).getOrElse(false)
5867
val compressionCodec = parameters.get("compression").map(CompressionCodecs.getCodecClassName)
5968
private val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
60-
val columnNameOfCorruptRecord = parameters.get("columnNameOfCorruptRecord")
69+
val columnNameOfCorruptRecord =
70+
parameters.getOrElse("columnNameOfCorruptRecord", defaultColumnNameOfCorruptRecord)
6171

6272
val timeZone: TimeZone = TimeZone.getTimeZone(parameters.getOrElse("timeZone", defaultTimeZoneId))
6373

@@ -69,6 +79,8 @@ private[sql] class JSONOptions(
6979
FastDateFormat.getInstance(
7080
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSZZ"), timeZone, Locale.US)
7181

82+
val wholeFile = parameters.get("wholeFile").map(_.toBoolean).getOrElse(false)
83+
7284
// Parse mode flags
7385
if (!ParseModes.isValidMode(parseMode)) {
7486
logWarning(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")

0 commit comments

Comments
 (0)