Skip to content

Commit a60aea8

Browse files
chenghao-intelmarmbrus
authored andcommitted
[SPARK-5683] [SQL] Avoid multiple json generator created
Author: Cheng Hao <hao.cheng@intel.com> Closes #4468 from chenghao-intel/json and squashes the following commits: aeb7801 [Cheng Hao] avoid multiple json generator created
1 parent 6195e24 commit a60aea8

File tree

3 files changed

+29
-16
lines changed

3 files changed

+29
-16
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql
1919

20+
import java.io.CharArrayWriter
21+
2022
import scala.language.implicitConversions
2123
import scala.reflect.ClassTag
2224
import scala.collection.JavaConversions._
@@ -380,8 +382,26 @@ private[sql] class DataFrameImpl protected[sql](
380382
override def toJSON: RDD[String] = {
381383
val rowSchema = this.schema
382384
this.mapPartitions { iter =>
383-
val jsonFactory = new JsonFactory()
384-
iter.map(JsonRDD.rowToJSON(rowSchema, jsonFactory))
385+
val writer = new CharArrayWriter()
386+
// create the Generator without separator inserted between 2 records
387+
val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
388+
389+
new Iterator[String] {
390+
override def hasNext() = iter.hasNext
391+
override def next(): String = {
392+
JsonRDD.rowToJSON(rowSchema, gen)(iter.next())
393+
gen.flush()
394+
395+
val json = writer.toString
396+
if (hasNext) {
397+
writer.reset()
398+
} else {
399+
gen.close()
400+
}
401+
402+
json
403+
}
404+
}
385405
}
386406
}
387407

sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ import java.sql.{Date, Timestamp}
2323
import scala.collection.Map
2424
import scala.collection.convert.Wrappers.{JMapWrapper, JListWrapper}
2525

26-
import com.fasterxml.jackson.core.JsonProcessingException
27-
import com.fasterxml.jackson.core.JsonFactory
26+
import com.fasterxml.jackson.core.{JsonGenerator, JsonProcessingException, JsonFactory}
2827
import com.fasterxml.jackson.databind.ObjectMapper
2928

3029
import org.apache.spark.rdd.RDD
@@ -430,14 +429,11 @@ private[sql] object JsonRDD extends Logging {
430429

431430
/** Transforms a single Row to JSON using Jackson
432431
*
433-
* @param jsonFactory a JsonFactory object to construct a JsonGenerator
434432
* @param rowSchema the schema object used for conversion
433+
* @param gen a JsonGenerator object
435434
* @param row The row to convert
436435
*/
437-
private[sql] def rowToJSON(rowSchema: StructType, jsonFactory: JsonFactory)(row: Row): String = {
438-
val writer = new StringWriter()
439-
val gen = jsonFactory.createGenerator(writer)
440-
436+
private[sql] def rowToJSON(rowSchema: StructType, gen: JsonGenerator)(row: Row) = {
441437
def valWriter: (DataType, Any) => Unit = {
442438
case (_, null) | (NullType, _) => gen.writeNull()
443439
case (StringType, v: String) => gen.writeString(v)
@@ -479,8 +475,5 @@ private[sql] object JsonRDD extends Logging {
479475
}
480476

481477
valWriter(rowSchema, row)
482-
gen.close()
483-
writer.toString
484478
}
485-
486479
}

sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -824,8 +824,8 @@ class JsonSuite extends QueryTest {
824824
df1.registerTempTable("applySchema1")
825825
val df2 = df1.toDataFrame
826826
val result = df2.toJSON.collect()
827-
assert(result(0) == "{\"f1\":1,\"f2\":\"A1\",\"f3\":true,\"f4\":[\"1\",\" A1\",\" true\",\" null\"]}")
828-
assert(result(3) == "{\"f1\":4,\"f2\":\"D4\",\"f3\":true,\"f4\":[\"4\",\" D4\",\" true\",\" 2147483644\"],\"f5\":2147483644}")
827+
assert(result(0) === "{\"f1\":1,\"f2\":\"A1\",\"f3\":true,\"f4\":[\"1\",\" A1\",\" true\",\" null\"]}")
828+
assert(result(3) === "{\"f1\":4,\"f2\":\"D4\",\"f3\":true,\"f4\":[\"4\",\" D4\",\" true\",\" 2147483644\"],\"f5\":2147483644}")
829829

830830
val schema2 = StructType(
831831
StructField("f1", StructType(
@@ -846,8 +846,8 @@ class JsonSuite extends QueryTest {
846846
val df4 = df3.toDataFrame
847847
val result2 = df4.toJSON.collect()
848848

849-
assert(result2(1) == "{\"f1\":{\"f11\":2,\"f12\":false},\"f2\":{\"B2\":null}}")
850-
assert(result2(3) == "{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}")
849+
assert(result2(1) === "{\"f1\":{\"f11\":2,\"f12\":false},\"f2\":{\"B2\":null}}")
850+
assert(result2(3) === "{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}")
851851

852852
val jsonDF = jsonRDD(primitiveFieldAndType)
853853
val primTable = jsonRDD(jsonDF.toJSON)

0 commit comments

Comments
 (0)