Skip to content

[SPARK-29347][SQL] Add JSON serialization for external Rows #26013

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 106 additions & 2 deletions sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,24 @@

package org.apache.spark.sql

import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate}
import java.util.{Base64, TimeZone}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.hashing.MurmurHash3

import org.apache.spark.annotation.Stable
import org.json4s._
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods._

import org.apache.spark.annotation.{Private, Stable, Unstable}
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, Decimal, MapType, StringType, StructType, UserDefinedType}

/**
* @since 1.3.0
Expand Down Expand Up @@ -501,4 +513,96 @@ trait Row extends Serializable {
private def getAnyValAs[T <: AnyVal](i: Int): T =
if (isNullAt(i)) throw new NullPointerException(s"Value at index $i is null")
else getAs[T](i)

/**
* The compact JSON representation of this row.
* @since 3.0
*/
@Unstable
def json: String = compact(jsonValue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell, how about reusing JacksonGenerator in our JSON datasource?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's pretty option for prettyJson too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right, schema can be unknown ..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well you still need the schema. The main reason for not using Jackson generator is that we need to convert back to an internal row and this is super slow.

Copy link
Member

@HyukjinKwon HyukjinKwon Oct 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this API looks already pretty slow though, and I suspect this API should not be called in a critical path .. ?
If it's supposed to be used in a critical path, we might rather have to provide a API to make a convert function given schema (so that we avoid type dispatch for every row).

One rather minor concern is that the JSON representation for a row seems different comparing to JSON datasource. e.g.) https://github.com/apache/spark/pull/26013/files#r331463832 and https://github.com/apache/spark/pull/26013/files#diff-78ce4e47d137bbb0d4350ad732b48d5bR576-R578

and here a bit duplicates the codes ..

Copy link
Contributor Author

@hvanhovell hvanhovell Oct 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So two things to consider here.

I want to use this in StreamingQueryProgress right? All the JSON serialization there is based on the json4s AST and not strings (which is what JacksonGenerator produces).

There is a difference between it being slow, and what you are suggesting. The latter being crazy inefficient. Let's break that down:

  • Row to InternalRow conversion. You will need to create a converter per row because there is currently no way we can safely cache a converter. You can either use ScalaReflection or RowEncoder here, the latter is particularly bad because it uses code generation (which takes in the order of mills and which is weakly cached on the driver).
  • Setting up the JacksonGenerator, again this is uncached and we need to set up the same thing for each tuple.
  • Generating the string.

Do you see my point here? Or shall I write a benchmark?

Copy link
Member

@HyukjinKwon HyukjinKwon Oct 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's one API case we dropped performance improvement in Row as an example (see #23271).

  @deprecated("This method is deprecated and will be removed in future versions.", "3.0.0")
  def merge(rows: Row*): Row = {
    // TODO: Improve the performance of this if used in performance critical part.
    new GenericRow(rows.flatMap(_.toSeq).toArray)
  }

Do you mind if I ask to add @Unstable or @Private for these new APIs instead just for future improvement in case, with @since in the Scaladoc?

Row itself is marked as @Stable so it might better explicitly note that this can be changed in the future. With this LGTM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will mark them as @unstable. @Private is debatable, because it is not really meant as an internal only API.


/**
* The pretty (i.e. indented) JSON representation of this row.
* @since 3.0
*/
@Unstable
def prettyJson: String = pretty(render(jsonValue))

/**
* JSON representation of the row.
*
* Note that this only supports the data types that are also supported by
* [[org.apache.spark.sql.catalyst.encoders.RowEncoder]].
*
* @return the JSON representation of the row.
*/
private[sql] def jsonValue: JValue = {
require(schema != null, "JSON serialization requires a non-null schema.")

lazy val zoneId = DateTimeUtils.getZoneId(SQLConf.get.sessionLocalTimeZone)
lazy val dateFormatter = DateFormatter.apply(zoneId)
lazy val timestampFormatter = TimestampFormatter(zoneId)

// Convert an iterator of values to a json array
def iteratorToJsonArray(iterator: Iterator[_], elementType: DataType): JArray = {
JArray(iterator.map(toJson(_, elementType)).toList)
}

// Convert a value to json.
def toJson(value: Any, dataType: DataType): JValue = (value, dataType) match {
case (null, _) => JNull
case (b: Boolean, _) => JBool(b)
case (b: Byte, _) => JLong(b)
case (s: Short, _) => JLong(s)
case (i: Int, _) => JLong(i)
case (l: Long, _) => JLong(l)
case (f: Float, _) => JDouble(f)
case (d: Double, _) => JDouble(d)
case (d: BigDecimal, _) => JDecimal(d)
case (d: java.math.BigDecimal, _) => JDecimal(d)
case (d: Decimal, _) => JDecimal(d.toBigDecimal)
case (s: String, _) => JString(s)
case (b: Array[Byte], BinaryType) =>
JString(Base64.getEncoder.encodeToString(b))
case (d: LocalDate, _) =>
JString(dateFormatter.format(DateTimeUtils.localDateToDays(d)))
case (d: Date, _) =>
JString(dateFormatter.format(DateTimeUtils.fromJavaDate(d)))
case (i: Instant, _) =>
JString(timestampFormatter.format(DateTimeUtils.instantToMicros(i)))
case (t: Timestamp, _) =>
JString(timestampFormatter.format(DateTimeUtils.fromJavaTimestamp(t)))
case (a: Array[_], ArrayType(elementType, _)) =>
iteratorToJsonArray(a.iterator, elementType)
case (s: Seq[_], ArrayType(elementType, _)) =>
iteratorToJsonArray(s.iterator, elementType)
case (m: Map[String @unchecked, _], MapType(StringType, valueType, _)) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it really worth to have a special format for string-type-key map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason would that is emits more readable JSON. This is similar to the way StreamingQueryProgress is rendering maps. I can revert if you feel strongly about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to convert the JSON string back to a Row? If we do then I think it's better to keep the ser/de simply. If not I'm fine with the code here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In its current form it is not really meant to be converted back.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can other primitive types like Int be good for this format too?

new JObject(m.toList.sortBy(_._1).map {
case (k, v) => k -> toJson(v, valueType)
})
case (m: Map[_, _], MapType(keyType, valueType, _)) =>
new JArray(m.iterator.map {
case (k, v) =>
new JObject("key" -> toJson(k, keyType) :: "value" -> toJson(v, valueType) :: Nil)
}.toList)
case (r: Row, _) => r.jsonValue
case (v: Any, udt: UserDefinedType[Any @unchecked]) =>
val dataType = udt.sqlType
toJson(CatalystTypeConverters.convertToScala(udt.serialize(v), dataType), dataType)
case _ =>
throw new IllegalArgumentException(s"Failed to convert value $value " +
s"(class of ${value.getClass}}) with the type of $dataType to JSON.")
}

// Convert the row fields to json
var n = 0
var elements = new mutable.ListBuffer[JField]
val len = length
while (n < len) {
val field = schema(n)
elements += (field.name -> toJson(apply(n), field.dataType))
n += 1
}
new JObject(elements.toList)
}
}
140 changes: 140 additions & 0 deletions sql/catalyst/src/test/scala/org/apache/spark/sql/RowJsonSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql

import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate}

import org.json4s.JsonAST.{JArray, JBool, JDecimal, JDouble, JLong, JNull, JObject, JString, JValue}

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.encoders.{ExamplePoint, ExamplePointUDT}
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

/**
* Test suite for [[Row]] JSON serialization.
*/
class RowJsonSuite extends SparkFunSuite {
private val schema = new StructType()
.add("c1", "string")
.add("c2", IntegerType)

private def testJson(name: String, value: Any, dt: DataType, expected: JValue): Unit = {
test(name) {
val row = new GenericRowWithSchema(Array(value), new StructType().add("a", dt))
assert(row.jsonValue === JObject("a" -> expected))
}
}

private def testJson(value: Any, dt: DataType, expected: JValue): Unit = {
testJson(s"$dt $value", value, dt, expected)
}

// Nulls
private def testJsonNull(dt: DataType, expected: JValue): Unit = {
testJson(null, dt, JNull)
}
testJsonNull(IntegerType, JNull)
testJsonNull(FloatType, JNull)
testJsonNull(ArrayType(DoubleType, containsNull = true), JNull)

// Primitives
testJson(true, BooleanType, JBool(true))
testJson(false, BooleanType, JBool(false))
testJson(23.toByte, ByteType, JLong(23))
testJson(-126.toByte, ByteType, JLong(-126))
testJson(20281.toShort, ShortType, JLong(20281))
testJson(-8752.toShort, ShortType, JLong(-8752))
testJson(1078231987, IntegerType, JLong(1078231987))
testJson(-10, IntegerType, JLong(-10))
testJson(139289832109874199L, LongType, JLong(139289832109874199L))
testJson(-7873748239973488L, LongType, JLong(-7873748239973488L))
testJson(10.232e10f, FloatType, JDouble(10.232e10f))
testJson(9.7e-13f, FloatType, JDouble(9.7e-13f))
testJson(3.891e98d, DoubleType, JDouble(3.891e98d))
testJson(-7.8e5d, DoubleType, JDouble(-7.8e5d))
testJson(BigDecimal("1092.88"), DecimalType(10, 2), JDecimal(BigDecimal("1092.88")))
testJson(Decimal("782.0003"), DecimalType(7, 4), JDecimal(BigDecimal("782.0003")))
testJson(new java.math.BigDecimal("-77.89"), DecimalType(4, 2), JDecimal(BigDecimal("-77.89")))
testJson("hello world", StringType, JString("hello world"))
testJson("BinaryType", Array('a'.toByte, 'b'.toByte), BinaryType, JString("YWI="))
testJson(Date.valueOf("2019-04-22"), DateType, JString("2019-04-22"))
testJson(LocalDate.of(2018, 5, 14), DateType, JString("2018-05-14"))
testJson(
Timestamp.valueOf("2017-01-06 10:22:03.00"),
TimestampType,
JString("2017-01-06 10:22:03"))
testJson(
Timestamp.valueOf("2017-05-30 10:22:03.00").toInstant,
TimestampType,
JString("2017-05-30 10:22:03"))

// Complex types
testJson(
"ArrayType(LongType,true)",
Array(1L, null, 77L),
ArrayType(LongType, containsNull = true),
JArray(JLong(1L) :: JNull :: JLong(77L) :: Nil))

testJson(
Seq(1, -2, 3),
ArrayType(IntegerType, containsNull = false),
JArray(JLong(1) :: JLong(-2) :: JLong(3) :: Nil))

testJson(
Map("a" -> "b", "c" -> "d", "e" -> null),
MapType(StringType, StringType, valueContainsNull = true),
JObject("a" -> JString("b"), "c" -> JString("d"), "e" -> JNull))

testJson(
Map(1 -> "b", 2 -> "d", 3 -> null),
MapType(IntegerType, StringType, valueContainsNull = true),
JArray(
JObject("key" -> JLong(1), "value" -> JString("b")) ::
JObject("key" -> JLong(2), "value" -> JString("d")) ::
JObject("key" -> JLong(3), "value" -> JNull) :: Nil))

testJson(
new GenericRowWithSchema(Array("1", 2), schema),
schema,
JObject("c1" -> JString("1"), "c2" -> JLong(2)))

testJson(
"UDT",
new ExamplePoint(3.4d, 8.98d),
new ExamplePointUDT,
JArray(JDouble(3.4d) :: JDouble(8.98d) :: Nil))

test("no schema") {
val e = intercept[IllegalArgumentException] {
Row("a").jsonValue
}
assert(e.getMessage.contains("requires a non-null schema"))
}

test("unsupported type") {
val e = intercept[IllegalArgumentException] {
val row = new GenericRowWithSchema(
Array((1, 2)),
new StructType().add("a", ObjectType(classOf[(Int, Int)])))
row.jsonValue
}
assert(e.getMessage.contains("Failed to convert value"))
}
}