Skip to content

Commit 98d42e2

Browse files
committed
This PR adds JSON serialization for Spark external Rows.
This is to be used for observable metrics where the `StreamingQueryProgress` contains a map of observed metrics rows which needs to be serialized in some cases. Added a new test suite: `RowJsonSuite` that should test this.
1 parent 178a1f3 commit 98d42e2

File tree

3 files changed

+251
-2
lines changed

3 files changed

+251
-2
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,24 @@
1717

1818
package org.apache.spark.sql
1919

20+
import java.sql.{Date, Timestamp}
21+
import java.time.{Instant, LocalDate}
22+
import java.util.{Base64, TimeZone}
23+
2024
import scala.collection.JavaConverters._
25+
import scala.collection.mutable
2126
import scala.util.hashing.MurmurHash3
2227

28+
import org.json4s._
29+
import org.json4s.JsonAST.JValue
30+
import org.json4s.jackson.JsonMethods._
31+
2332
import org.apache.spark.annotation.Stable
33+
import org.apache.spark.sql.catalyst.CatalystTypeConverters
2434
import org.apache.spark.sql.catalyst.expressions.GenericRow
25-
import org.apache.spark.sql.types.StructType
35+
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
36+
import org.apache.spark.sql.internal.SQLConf
37+
import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, Decimal, MapType, StringType, StructType, UserDefinedType}
2638

2739
/**
2840
* @since 1.3.0
@@ -501,4 +513,88 @@ trait Row extends Serializable {
501513
private def getAnyValAs[T <: AnyVal](i: Int): T =
502514
if (isNullAt(i)) throw new NullPointerException(s"Value at index $i is null")
503515
else getAs[T](i)
516+
517+
/** The compact JSON representation of this row. */
518+
def json: String = compact(jsonValue)
519+
520+
/** The pretty (i.e. indented) JSON representation of this row. */
521+
def prettyJson: String = pretty(render(jsonValue))
522+
523+
/**
524+
* JSON representation of the row.
525+
*
526+
* Note that this only supports the data types that are also supported by
527+
* [[org.apache.spark.sql.catalyst.encoders.RowEncoder]].
528+
*
529+
* @return the JSON representation of the row.
530+
*/
531+
private[sql] def jsonValue: JValue = {
532+
require(schema != null, "JSON serialization requires a non-null schema.")
533+
534+
lazy val timeZone = TimeZone.getTimeZone(SQLConf.get.sessionLocalTimeZone)
535+
lazy val dateFormatter = DateFormatter.apply(timeZone.toZoneId)
536+
lazy val timestampFormatter = TimestampFormatter.apply(timeZone.toZoneId)
537+
538+
// Convert an iterator of values to a json array
539+
def iteratorToJsonArray(iterator: Iterator[_], elementType: DataType): JArray = {
540+
JArray(iterator.map(toJson(_, elementType)).toList)
541+
}
542+
543+
// Convert a value to json.
544+
def toJson(value: Any, dataType: DataType): JValue = (value, dataType) match {
545+
case (null, _) => JNull
546+
case (b: Boolean, _) => JBool(b)
547+
case (b: Byte, _) => JLong(b)
548+
case (s: Short, _) => JLong(s)
549+
case (i: Int, _) => JLong(i)
550+
case (l: Long, _) => JLong(l)
551+
case (f: Float, _) => JDouble(f)
552+
case (d: Double, _) => JDouble(d)
553+
case (d: BigDecimal, _) => JDecimal(d)
554+
case (d: java.math.BigDecimal, _) => JDecimal(d)
555+
case (d: Decimal, _) => JDecimal(d.toBigDecimal)
556+
case (s: String, _) => JString(s)
557+
case (b: Array[Byte], BinaryType) =>
558+
JString(Base64.getEncoder.encodeToString(b))
559+
case (d: LocalDate, _) =>
560+
JString(dateFormatter.format(DateTimeUtils.localDateToDays(d)))
561+
case (d: Date, _) =>
562+
JString(dateFormatter.format(DateTimeUtils.fromJavaDate(d)))
563+
case (i: Instant, _) =>
564+
JString(timestampFormatter.format(DateTimeUtils.instantToMicros(i)))
565+
case (t: Timestamp, _) =>
566+
JString(timestampFormatter.format(DateTimeUtils.fromJavaTimestamp(t)))
567+
case (a: Array[_], ArrayType(elementType, _)) =>
568+
iteratorToJsonArray(a.iterator, elementType)
569+
case (s: Seq[_], ArrayType(elementType, _)) =>
570+
iteratorToJsonArray(s.iterator, elementType)
571+
case (m: Map[String @unchecked, _], MapType(StringType, valueType, _)) =>
572+
new JObject(m.toList.sortBy(_._1).map {
573+
case (k, v) => k -> toJson(v, valueType)
574+
})
575+
case (m: Map[_, _], MapType(keyType, valueType, _)) =>
576+
new JArray(m.iterator.map {
577+
case (k, v) =>
578+
new JObject("key" -> toJson(k, keyType) :: "value" -> toJson(v, valueType) :: Nil)
579+
}.toList)
580+
case (r: Row, _) => r.jsonValue
581+
case (v: Any, udt: UserDefinedType[Any @unchecked]) =>
582+
val dataType = udt.sqlType
583+
toJson(CatalystTypeConverters.convertToScala(udt.serialize(v), dataType), dataType)
584+
case _ =>
585+
throw new IllegalArgumentException(s"Failed to convert value $value " +
586+
s"(class of ${value.getClass}}) with the type of $dataType to JSON.")
587+
}
588+
589+
// Convert the row fields to json
590+
var n = 0
591+
var elements = new mutable.ListBuffer[JField]
592+
val len = length
593+
while (n < len) {
594+
val field = schema(n)
595+
elements += (field.name -> toJson(apply(n), field.dataType))
596+
n += 1
597+
}
598+
new JObject(elements.toList)
599+
}
504600
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,12 @@ object SQLConf {
155155
confGetter.get()()
156156
}
157157
} else {
158-
confGetter.get()()
158+
val conf = existingConf.get()
159+
if (conf != null) {
160+
conf
161+
} else {
162+
confGetter.get()()
163+
}
159164
}
160165
}
161166
}
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql
18+
19+
import java.sql.{Date, Timestamp}
20+
import java.time.{Instant, LocalDate}
21+
22+
import org.json4s.JsonAST.{JArray, JBool, JDecimal, JDouble, JLong, JNull, JObject, JString, JValue}
23+
24+
import org.apache.spark.SparkFunSuite
25+
import org.apache.spark.sql.catalyst.encoders.{ExamplePoint, ExamplePointUDT}
26+
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
27+
import org.apache.spark.sql.internal.SQLConf
28+
import org.apache.spark.sql.types._
29+
30+
/**
31+
* Test suite for [[Row]] JSON serialization.
32+
*/
33+
class RowJsonSuite extends SparkFunSuite {
34+
private val schema = new StructType()
35+
.add("c1", "string")
36+
.add("c2", IntegerType)
37+
38+
private val conf: SQLConf = {
39+
val conf = new SQLConf
40+
conf.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, "UTC")
41+
conf
42+
}
43+
44+
private def testJson(name: String, value: Any, dt: DataType, expected: JValue): Unit = {
45+
test(name) {
46+
SQLConf.withExistingConf(conf) {
47+
val row = new GenericRowWithSchema(Array(value), new StructType().add("a", dt))
48+
assert(row.jsonValue === JObject("a" -> expected))
49+
}
50+
}
51+
}
52+
53+
private def testJson(value: Any, dt: DataType, expected: JValue): Unit = {
54+
testJson(s"$dt $value", value, dt, expected)
55+
}
56+
57+
// Nulls
58+
private def testJsonNull(dt: DataType, expected: JValue): Unit = {
59+
testJson(null, dt, JNull)
60+
}
61+
testJsonNull(IntegerType, JNull)
62+
testJsonNull(FloatType, JNull)
63+
testJsonNull(ArrayType(DoubleType, containsNull = true), JNull)
64+
65+
// Primitives
66+
testJson(true, BooleanType, JBool(true))
67+
testJson(false, BooleanType, JBool(false))
68+
testJson(23.toByte, ByteType, JLong(23))
69+
testJson(-126.toByte, ByteType, JLong(-126))
70+
testJson(20281.toShort, ShortType, JLong(20281))
71+
testJson(-8752.toShort, ShortType, JLong(-8752))
72+
testJson(1078231987, IntegerType, JLong(1078231987))
73+
testJson(-10, IntegerType, JLong(-10))
74+
testJson(139289832109874199L, LongType, JLong(139289832109874199L))
75+
testJson(-7873748239973488L, LongType, JLong(-7873748239973488L))
76+
testJson(10.232e10f, FloatType, JDouble(10.232e10f))
77+
testJson(9.7e-13f, FloatType, JDouble(9.7e-13f))
78+
testJson(3.891e98d, DoubleType, JDouble(3.891e98d))
79+
testJson(-7.8e5d, DoubleType, JDouble(-7.8e5d))
80+
testJson(BigDecimal("1092.88"), DecimalType(10, 2), JDecimal(BigDecimal("1092.88")))
81+
testJson(Decimal("782.0003"), DecimalType(7, 4), JDecimal(BigDecimal("782.0003")))
82+
testJson(new java.math.BigDecimal("-77.89"), DecimalType(4, 2), JDecimal(BigDecimal("-77.89")))
83+
testJson("hello world", StringType, JString("hello world"))
84+
testJson("BinaryType", Array('a'.toByte, 'b'.toByte), BinaryType, JString("YWI="))
85+
testJson(Date.valueOf("2019-04-22"), DateType, JString("2019-04-22"))
86+
testJson(LocalDate.of(2018, 5, 14), DateType, JString("2018-05-14"))
87+
testJson(
88+
new Timestamp(Instant.parse("2017-01-06T10:22:03.00Z").toEpochMilli),
89+
TimestampType,
90+
JString("2017-01-06 10:22:03"))
91+
testJson(
92+
Instant.parse("2017-05-30T10:22:03.00Z"),
93+
TimestampType,
94+
JString("2017-05-30 10:22:03"))
95+
96+
// Complex types
97+
testJson(
98+
"ArrayType(LongType,true)",
99+
Array(1L, null, 77L),
100+
ArrayType(LongType, containsNull = true),
101+
JArray(JLong(1L) :: JNull :: JLong(77L) :: Nil))
102+
103+
testJson(
104+
Seq(1, -2, 3),
105+
ArrayType(IntegerType, containsNull = false),
106+
JArray(JLong(1) :: JLong(-2) :: JLong(3) :: Nil))
107+
108+
testJson(
109+
Map("a" -> "b", "c" -> "d", "e" -> null),
110+
MapType(StringType, StringType, valueContainsNull = true),
111+
JObject("a" -> JString("b"), "c" -> JString("d"), "e" -> JNull))
112+
113+
testJson(
114+
Map(1 -> "b", 2 -> "d", 3 -> null),
115+
MapType(IntegerType, StringType, valueContainsNull = true),
116+
JArray(
117+
JObject("key" -> JLong(1), "value" -> JString("b")) ::
118+
JObject("key" -> JLong(2), "value" -> JString("d")) ::
119+
JObject("key" -> JLong(3), "value" -> JNull) :: Nil))
120+
121+
testJson(
122+
new GenericRowWithSchema(Array("1", 2), schema),
123+
schema,
124+
JObject("c1" -> JString("1"), "c2" -> JLong(2)))
125+
126+
testJson(
127+
"UDT",
128+
new ExamplePoint(3.4d, 8.98d),
129+
new ExamplePointUDT,
130+
JArray(JDouble(3.4d) :: JDouble(8.98d) :: Nil))
131+
132+
test("no schema") {
133+
val e = intercept[IllegalArgumentException] {
134+
Row("a").jsonValue
135+
}
136+
assert(e.getMessage.contains("requires a non-null schema"))
137+
}
138+
139+
test("unsupported type") {
140+
val e = intercept[IllegalArgumentException] {
141+
val row = new GenericRowWithSchema(
142+
Array((1, 2)),
143+
new StructType().add("a", ObjectType(classOf[(Int, Int)])))
144+
row.jsonValue
145+
}
146+
assert(e.getMessage.contains("Failed to convert value"))
147+
}
148+
}

0 commit comments

Comments
 (0)