Skip to content

[SPARK-17699] Support for parsing JSON string columns #15274

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1706,6 +1706,29 @@ def json_tuple(col, *fields):
return Column(jc)


@since(2.1)
def from_json(col, schema, options={}):
"""
Parses a column containing a JSON string into a [[StructType]] with the
specified schema. Returns `null`, in the case of an unparseable string.

:param col: string column in json format
:param schema: a StructType to use when parsing the json column
:param options: options to control parsing. accepts the same options as the json datasource

>>> from pyspark.sql.types import *
>>> data = [(1, '''{"a": 1}''')]
>>> schema = StructType([StructField("a", IntegerType())])
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(from_json(df.value, schema).alias("json")).collect()
[Row(json=Row(a=1))]
"""

sc = SparkContext._active_spark_context
jc = sc._jvm.functions.from_json(_to_java_column(col), schema.json(), options)
return Column(jc)


@since(1.5)
def size(col):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import scala.util.parsing.combinator.RegexParsers

import com.fasterxml.jackson.core._

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions, SparkSQLJsonProcessingException}
import org.apache.spark.sql.catalyst.util.ParseModes
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -467,3 +469,28 @@ case class JsonTuple(children: Seq[Expression])
}
}

/**
* Converts an json input string to a [[StructType]] with the specified schema.
*/
case class JsonToStruct(schema: StructType, options: Map[String, String], child: Expression)
Copy link
Contributor

@hvanhovell hvanhovell Sep 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this implement ExpectsInputTypes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, it definitly should. Let me update.

extends Expression with CodegenFallback with ExpectsInputTypes {
override def nullable: Boolean = true

@transient
lazy val parser =
new JacksonParser(
schema,
"invalid", // Not used since we force fail fast. Invalid rows will be set to `null`.
new JSONOptions(options ++ Map("mode" -> ParseModes.FAIL_FAST_MODE)))

override def dataType: DataType = schema
override def children: Seq[Expression] = child :: Nil

override def eval(input: InternalRow): Any = {
try parser.parse(child.eval(input).toString).head catch {
case _: SparkSQLJsonProcessingException => null
}
}

override def inputTypes: Seq[AbstractDataType] = StringType :: Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.json
package org.apache.spark.sql.catalyst.json

import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}

/**
* Options for the JSON data source.
* Options for parsing JSON data into Spark SQL rows.
*
* Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.json
package org.apache.spark.sql.catalyst.json

import java.io.ByteArrayOutputStream

Expand All @@ -28,19 +28,22 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.datasources.ParseModes.{DROP_MALFORMED_MODE, PERMISSIVE_MODE}
import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils

private[json] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)
private[sql] class SparkSQLJsonProcessingException(msg: String) extends RuntimeException(msg)

/**
* Constructs a parser for a given schema that translates a json string to an [[InternalRow]].
*/
class JacksonParser(
schema: StructType,
columnNameOfCorruptRecord: String,
options: JSONOptions) extends Logging {

import JacksonUtils._
import ParseModes._
import com.fasterxml.jackson.core.JsonToken._

// A `ValueConverter` is responsible for converting a value from `JsonParser`
Expand All @@ -65,7 +68,7 @@ class JacksonParser(
private def failedRecord(record: String): Seq[InternalRow] = {
// create a row even if no corrupt record column is present
if (options.failFast) {
throw new RuntimeException(s"Malformed line in FAILFAST mode: $record")
throw new SparkSQLJsonProcessingException(s"Malformed line in FAILFAST mode: $record")
}
if (options.dropMalformed) {
if (!isWarningPrintedForMalformedRecord) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.json
package org.apache.spark.sql.catalyst.json

import com.fasterxml.jackson.core.{JsonParser, JsonToken}

private object JacksonUtils {
object JacksonUtils {
/**
* Advance the parser until a null or a specific token is found
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources
package org.apache.spark.sql.catalyst.util

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.{BZip2Codec, DeflateCodec, GzipCodec, Lz4Codec, SnappyCodec}
import org.apache.hadoop.io.compress._

import org.apache.spark.util.Utils

private[datasources] object CompressionCodecs {
object CompressionCodecs {
private val shortCompressionCodecNames = Map(
"none" -> null,
"uncompressed" -> null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources
package org.apache.spark.sql.catalyst.util

private[datasources] object ParseModes {
object ParseModes {
val PERMISSIVE_MODE = "PERMISSIVE"
val DROP_MALFORMED_MODE = "DROPMALFORMED"
val FAIL_FAST_MODE = "FAILFAST"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.ParseModes
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String

class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
Expand Down Expand Up @@ -317,4 +319,28 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
JsonTuple(Literal("{\"a\":\"b\nc\"}") :: Literal("a") :: Nil),
InternalRow.fromSeq(Seq(UTF8String.fromString("b\nc"))))
}

test("from_json") {
val jsonData = """{"a": 1}"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
JsonToStruct(schema, Map.empty, Literal(jsonData)),
InternalRow.fromSeq(1 :: Nil)
)
}

test("from_json - invalid data") {
val jsonData = """{"a" 1}"""
val schema = StructType(StructField("a", IntegerType) :: Nil)
checkEvaluation(
JsonToStruct(schema, Map.empty, Literal(jsonData)),
null
)

// Other modes should still return `null`.
checkEvaluation(
JsonToStruct(schema, Map("mode" -> ParseModes.PERMISSIVE_MODE), Literal(jsonData)),
null
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import java.util.Properties

import scala.collection.JavaConverters._

import org.apache.spark.Partition
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
import org.apache.spark.Partition
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.execution.datasources.json.{InferSchema, JacksonParser, JSONOptions}
import org.apache.spark.sql.execution.datasources.json.InferSchema
import org.apache.spark.sql.types.StructType

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets
import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes}
import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}

private[csv] class CSVOptions(@transient private val parameters: Map[String, String])
extends Logging with Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import com.fasterxml.jackson.core._

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import java.io.Writer

import com.fasterxml.jackson.core._

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
import org.apache.spark.sql.types._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{StringType, StructType}
Expand Down
58 changes: 58 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql

import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.runtime.universe.{typeTag, TypeTag}
import scala.util.Try
Expand Down Expand Up @@ -2818,6 +2819,63 @@ object functions {
JsonTuple(json.expr +: fields.map(Literal.apply))
}

/**
* (Scala-specific) Parses a column containing a JSON string into a [[StructType]] with the
* specified schema. Returns `null`, in the case of an unparseable string.
*
* @param schema the schema to use when parsing the json string
* @param options options to control how the json is parsed. accepts the same options and the
* json data source.
* @param e a string column containing JSON data.
*
* @group collection_funcs
* @since 2.1.0
*/
def from_json(e: Column, schema: StructType, options: Map[String, String]): Column = withExpr {
JsonToStruct(schema, options, e.expr)
}

/**
* (Java-specific) Parses a column containing a JSON string into a [[StructType]] with the
* specified schema. Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string
* @param options options to control how the json is parsed. accepts the same options and the
* json data source.
*
* @group collection_funcs
* @since 2.1.0
*/
def from_json(e: Column, schema: StructType, options: java.util.Map[String, String]): Column =
from_json(e, schema, options.asScala.toMap)

/**
* Parses a column containing a JSON string into a [[StructType]] with the specified schema.
* Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string
*
* @group collection_funcs
* @since 2.1.0
*/
def from_json(e: Column, schema: StructType): Column =
from_json(e, schema, Map.empty[String, String])

/**
* Parses a column containing a JSON string into a [[StructType]] with the specified schema.
* Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string as a json string
*
* @group collection_funcs
* @since 2.1.0
*/
def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column =
from_json(e, DataType.fromJson(schema).asInstanceOf[StructType], options)

/**
* Returns length of array or map.
*
Expand Down
Loading