Skip to content

Commit 42de525

Browse files
committed
[SPARK-11745][SQL] Enable more JSON parsing options
This patch adds the following options to the JSON data source, for dealing with non-standard JSON files: * `allowComments` (default `false`): ignores Java/C++ style comment in JSON records * `allowUnquotedFieldNames` (default `false`): allows unquoted JSON field names * `allowSingleQuotes` (default `true`): allows single quotes in addition to double quotes * `allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers (e.g. 00012) To avoid passing a lot of options throughout the json package, I introduced a new JSONOptions case class to define all JSON config options. Also updated documentation to explain these options. Scala ![screen shot 2015-11-15 at 6 12 12 pm](https://cloud.githubusercontent.com/assets/323388/11172965/e3ace6ec-8bc4-11e5-805e-2d78f80d0ed6.png) Python ![screen shot 2015-11-15 at 6 11 28 pm](https://cloud.githubusercontent.com/assets/323388/11172964/e23ed6ee-8bc4-11e5-8216-312f5983acd5.png) Author: Reynold Xin <rxin@databricks.com> Closes #9724 from rxin/SPARK-11745.
1 parent fd50fa4 commit 42de525

File tree

9 files changed

+286
-106
lines changed

9 files changed

+286
-106
lines changed

python/pyspark/sql/readwriter.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,16 @@ def json(self, path, schema=None):
153153
or RDD of Strings storing JSON objects.
154154
:param schema: an optional :class:`StructType` for the input schema.
155155
156+
You can set the following JSON-specific options to deal with non-standard JSON files:
157+
* ``primitivesAsString`` (default ``false``): infers all primitive values as a string \
158+
type
159+
* ``allowComments`` (default ``false``): ignores Java/C++ style comment in JSON records
160+
* ``allowUnquotedFieldNames`` (default ``false``): allows unquoted JSON field names
161+
* ``allowSingleQuotes`` (default ``true``): allows single quotes in addition to double \
162+
quotes
163+
* ``allowNumericLeadingZeros`` (default ``false``): allows leading zeros in numbers \
164+
(e.g. 00012)
165+
156166
>>> df1 = sqlContext.read.json('python/test_support/sql/people.json')
157167
>>> df1.dtypes
158168
[('age', 'bigint'), ('name', 'string')]

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.api.java.JavaRDD
2929
import org.apache.spark.deploy.SparkHadoopUtil
3030
import org.apache.spark.rdd.RDD
3131
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
32-
import org.apache.spark.sql.execution.datasources.json.JSONRelation
32+
import org.apache.spark.sql.execution.datasources.json.{JSONOptions, JSONRelation}
3333
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
3434
import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
3535
import org.apache.spark.sql.types.StructType
@@ -227,6 +227,15 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
227227
* This function goes through the input once to determine the input schema. If you know the
228228
* schema in advance, use the version that specifies the schema to avoid the extra scan.
229229
*
230+
* You can set the following JSON-specific options to deal with non-standard JSON files:
231+
* <li>`primitivesAsString` (default `false`): infers all primitive values as a string type</li>
232+
* <li>`allowComments` (default `false`): ignores Java/C++ style comment in JSON records</li>
233+
* <li>`allowUnquotedFieldNames` (default `false`): allows unquoted JSON field names</li>
234+
* <li>`allowSingleQuotes` (default `true`): allows single quotes in addition to double quotes
235+
* </li>
236+
* <li>`allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers
237+
* (e.g. 00012)</li>
238+
*
230239
* @param path input path
231240
* @since 1.4.0
232241
*/
@@ -255,16 +264,13 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
255264
* @since 1.4.0
256265
*/
257266
def json(jsonRDD: RDD[String]): DataFrame = {
258-
val samplingRatio = extraOptions.getOrElse("samplingRatio", "1.0").toDouble
259-
val primitivesAsString = extraOptions.getOrElse("primitivesAsString", "false").toBoolean
260267
sqlContext.baseRelationToDataFrame(
261268
new JSONRelation(
262269
Some(jsonRDD),
263-
samplingRatio,
264-
primitivesAsString,
265-
userSpecifiedSchema,
266-
None,
267-
None)(sqlContext)
270+
maybeDataSchema = userSpecifiedSchema,
271+
maybePartitionSpec = None,
272+
userDefinedPartitionColumns = None,
273+
parameters = extraOptions.toMap)(sqlContext)
268274
)
269275
}
270276

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -221,22 +221,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
221221

222222
private[this] def isTesting: Boolean = sys.props.contains("spark.testing")
223223

224-
protected def newProjection(
225-
expressions: Seq[Expression], inputSchema: Seq[Attribute]): Projection = {
226-
log.debug(s"Creating Projection: $expressions, inputSchema: $inputSchema")
227-
try {
228-
GenerateProjection.generate(expressions, inputSchema)
229-
} catch {
230-
case e: Exception =>
231-
if (isTesting) {
232-
throw e
233-
} else {
234-
log.error("Failed to generate projection, fallback to interpret", e)
235-
new InterpretedProjection(expressions, inputSchema)
236-
}
237-
}
238-
}
239-
240224
protected def newMutableProjection(
241225
expressions: Seq[Expression], inputSchema: Seq[Attribute]): () => MutableProjection = {
242226
log.debug(s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
@@ -282,6 +266,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
282266
}
283267
}
284268
}
269+
285270
/**
286271
* Creates a row ordering for the given schema, in natural ascending order.
287272
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,33 +25,36 @@ import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
2525
import org.apache.spark.sql.types._
2626
import org.apache.spark.util.Utils
2727

28-
private[sql] object InferSchema {
28+
29+
private[json] object InferSchema {
30+
2931
/**
3032
* Infer the type of a collection of json records in three stages:
3133
* 1. Infer the type of each record
3234
* 2. Merge types by choosing the lowest type necessary to cover equal keys
3335
* 3. Replace any remaining null fields with string, the top type
3436
*/
35-
def apply(
37+
def infer(
3638
json: RDD[String],
37-
samplingRatio: Double = 1.0,
3839
columnNameOfCorruptRecords: String,
39-
primitivesAsString: Boolean = false): StructType = {
40-
require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0")
41-
val schemaData = if (samplingRatio > 0.99) {
40+
configOptions: JSONOptions): StructType = {
41+
require(configOptions.samplingRatio > 0,
42+
s"samplingRatio (${configOptions.samplingRatio}) should be greater than 0")
43+
val schemaData = if (configOptions.samplingRatio > 0.99) {
4244
json
4345
} else {
44-
json.sample(withReplacement = false, samplingRatio, 1)
46+
json.sample(withReplacement = false, configOptions.samplingRatio, 1)
4547
}
4648

4749
// perform schema inference on each row and merge afterwards
4850
val rootType = schemaData.mapPartitions { iter =>
4951
val factory = new JsonFactory()
52+
configOptions.setJacksonOptions(factory)
5053
iter.map { row =>
5154
try {
5255
Utils.tryWithResource(factory.createParser(row)) { parser =>
5356
parser.nextToken()
54-
inferField(parser, primitivesAsString)
57+
inferField(parser, configOptions)
5558
}
5659
} catch {
5760
case _: JsonParseException =>
@@ -71,14 +74,14 @@ private[sql] object InferSchema {
7174
/**
7275
* Infer the type of a json document from the parser's token stream
7376
*/
74-
private def inferField(parser: JsonParser, primitivesAsString: Boolean): DataType = {
77+
private def inferField(parser: JsonParser, configOptions: JSONOptions): DataType = {
7578
import com.fasterxml.jackson.core.JsonToken._
7679
parser.getCurrentToken match {
7780
case null | VALUE_NULL => NullType
7881

7982
case FIELD_NAME =>
8083
parser.nextToken()
81-
inferField(parser, primitivesAsString)
84+
inferField(parser, configOptions)
8285

8386
case VALUE_STRING if parser.getTextLength < 1 =>
8487
// Zero length strings and nulls have special handling to deal
@@ -95,7 +98,7 @@ private[sql] object InferSchema {
9598
while (nextUntil(parser, END_OBJECT)) {
9699
builder += StructField(
97100
parser.getCurrentName,
98-
inferField(parser, primitivesAsString),
101+
inferField(parser, configOptions),
99102
nullable = true)
100103
}
101104

@@ -107,14 +110,15 @@ private[sql] object InferSchema {
107110
// the type as we pass through all JSON objects.
108111
var elementType: DataType = NullType
109112
while (nextUntil(parser, END_ARRAY)) {
110-
elementType = compatibleType(elementType, inferField(parser, primitivesAsString))
113+
elementType = compatibleType(
114+
elementType, inferField(parser, configOptions))
111115
}
112116

113117
ArrayType(elementType)
114118

115-
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) if primitivesAsString => StringType
119+
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) if configOptions.primitivesAsString => StringType
116120

117-
case (VALUE_TRUE | VALUE_FALSE) if primitivesAsString => StringType
121+
case (VALUE_TRUE | VALUE_FALSE) if configOptions.primitivesAsString => StringType
118122

119123
case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
120124
import JsonParser.NumberType._
@@ -178,7 +182,7 @@ private[sql] object InferSchema {
178182
/**
179183
* Returns the most general data type for two given data types.
180184
*/
181-
private[json] def compatibleType(t1: DataType, t2: DataType): DataType = {
185+
def compatibleType(t1: DataType, t2: DataType): DataType = {
182186
HiveTypeCoercion.findTightestCommonTypeOfTwo(t1, t2).getOrElse {
183187
// t1 or t2 is a StructType, ArrayType, or an unexpected type.
184188
(t1, t2) match {
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.json
19+
20+
import com.fasterxml.jackson.core.{JsonParser, JsonFactory}
21+
22+
/**
23+
* Options for the JSON data source.
24+
*
25+
* Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
26+
*/
27+
case class JSONOptions(
28+
samplingRatio: Double = 1.0,
29+
primitivesAsString: Boolean = false,
30+
allowComments: Boolean = false,
31+
allowUnquotedFieldNames: Boolean = false,
32+
allowSingleQuotes: Boolean = true,
33+
allowNumericLeadingZeros: Boolean = false,
34+
allowNonNumericNumbers: Boolean = false) {
35+
36+
/** Sets config options on a Jackson [[JsonFactory]]. */
37+
def setJacksonOptions(factory: JsonFactory): Unit = {
38+
factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments)
39+
factory.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, allowUnquotedFieldNames)
40+
factory.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, allowSingleQuotes)
41+
factory.configure(JsonParser.Feature.ALLOW_NUMERIC_LEADING_ZEROS, allowNumericLeadingZeros)
42+
factory.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, allowNonNumericNumbers)
43+
}
44+
}
45+
46+
47+
object JSONOptions {
48+
def createFromConfigMap(parameters: Map[String, String]): JSONOptions = JSONOptions(
49+
samplingRatio =
50+
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0),
51+
primitivesAsString =
52+
parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false),
53+
allowComments =
54+
parameters.get("allowComments").map(_.toBoolean).getOrElse(false),
55+
allowUnquotedFieldNames =
56+
parameters.get("allowUnquotedFieldNames").map(_.toBoolean).getOrElse(false),
57+
allowSingleQuotes =
58+
parameters.get("allowSingleQuotes").map(_.toBoolean).getOrElse(true),
59+
allowNumericLeadingZeros =
60+
parameters.get("allowNumericLeadingZeros").map(_.toBoolean).getOrElse(false),
61+
allowNonNumericNumbers =
62+
parameters.get("allowNonNumericNumbers").map(_.toBoolean).getOrElse(true)
63+
)
64+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,9 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
5252
dataSchema: Option[StructType],
5353
partitionColumns: Option[StructType],
5454
parameters: Map[String, String]): HadoopFsRelation = {
55-
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
56-
val primitivesAsString = parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false)
5755

5856
new JSONRelation(
5957
inputRDD = None,
60-
samplingRatio = samplingRatio,
61-
primitivesAsString = primitivesAsString,
6258
maybeDataSchema = dataSchema,
6359
maybePartitionSpec = None,
6460
userDefinedPartitionColumns = partitionColumns,
@@ -69,8 +65,6 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
6965

7066
private[sql] class JSONRelation(
7167
val inputRDD: Option[RDD[String]],
72-
val samplingRatio: Double,
73-
val primitivesAsString: Boolean,
7468
val maybeDataSchema: Option[StructType],
7569
val maybePartitionSpec: Option[PartitionSpec],
7670
override val userDefinedPartitionColumns: Option[StructType],
@@ -79,6 +73,8 @@ private[sql] class JSONRelation(
7973
(@transient val sqlContext: SQLContext)
8074
extends HadoopFsRelation(maybePartitionSpec, parameters) {
8175

76+
val options: JSONOptions = JSONOptions.createFromConfigMap(parameters)
77+
8278
/** Constraints to be imposed on schema to be stored. */
8379
private def checkConstraints(schema: StructType): Unit = {
8480
if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
@@ -109,17 +105,16 @@ private[sql] class JSONRelation(
109105
classOf[Text]).map(_._2.toString) // get the text line
110106
}
111107

112-
override lazy val dataSchema = {
108+
override lazy val dataSchema: StructType = {
113109
val jsonSchema = maybeDataSchema.getOrElse {
114110
val files = cachedLeafStatuses().filterNot { status =>
115111
val name = status.getPath.getName
116112
name.startsWith("_") || name.startsWith(".")
117113
}.toArray
118-
InferSchema(
114+
InferSchema.infer(
119115
inputRDD.getOrElse(createBaseRdd(files)),
120-
samplingRatio,
121116
sqlContext.conf.columnNameOfCorruptRecord,
122-
primitivesAsString)
117+
options)
123118
}
124119
checkConstraints(jsonSchema)
125120

@@ -132,10 +127,11 @@ private[sql] class JSONRelation(
132127
inputPaths: Array[FileStatus],
133128
broadcastedConf: Broadcast[SerializableConfiguration]): RDD[InternalRow] = {
134129
val requiredDataSchema = StructType(requiredColumns.map(dataSchema(_)))
135-
val rows = JacksonParser(
130+
val rows = JacksonParser.parse(
136131
inputRDD.getOrElse(createBaseRdd(inputPaths)),
137132
requiredDataSchema,
138-
sqlContext.conf.columnNameOfCorruptRecord)
133+
sqlContext.conf.columnNameOfCorruptRecord,
134+
options)
139135

140136
rows.mapPartitions { iterator =>
141137
val unsafeProjection = UnsafeProjection.create(requiredDataSchema)

0 commit comments

Comments
 (0)