Skip to content

[SPARK-10495] [SQL] Read date values in JSON data stored by Spark 1.5.0. #8806

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,38 @@ private[sql] object JacksonGenerator {
valWriter(field.dataType, v)
}
gen.writeEndObject()

// For UDT, udt.serialize will produce SQL types. So, we need the following three cases.
case (ArrayType(ty, _), v: ArrayData) =>
gen.writeStartArray()
v.foreach(ty, (_, value) => valWriter(ty, value))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably replace this foreach with a while.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This foreach is provided by ArrayData.

gen.writeEndArray()

case (MapType(kt, vt, _), v: MapData) =>
gen.writeStartObject()
v.foreach(kt, vt, { (k, v) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

gen.writeFieldName(k.toString)
valWriter(vt, v)
})
gen.writeEndObject()

case (StructType(ty), v: InternalRow) =>
gen.writeStartObject()
var i = 0
while (i < ty.length) {
val field = ty(i)
val value = v.get(i, field.dataType)
if (value != null) {
gen.writeFieldName(field.name)
valWriter(field.dataType, value)
}
i += 1
}
gen.writeEndObject()

case (dt, v) =>
sys.error(
s"Failed to convert value $v (class of ${v.getClass}}) with the type of $dt to JSON.")
}

valWriter(rowSchema, row)
Expand Down Expand Up @@ -133,6 +165,10 @@ private[sql] object JacksonGenerator {
i += 1
}
gen.writeEndObject()

case (dt, v) =>
sys.error(
s"Failed to convert value $v (class of ${v.getClass}}) with the type of $dt to JSON.")
}

valWriter(rowSchema, row)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,23 @@ private[sql] object JacksonParser {
// guard the non string type
null

case (VALUE_STRING, BinaryType) =>
parser.getBinaryValue

case (VALUE_STRING, DateType) =>
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
val stringValue = parser.getText
if (stringValue.contains("-")) {
// The format of this string will probably be "yyyy-mm-dd".
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether this is still true if we take different locales into consideration. Maybe use a try block here to parse the string as a date and then fallback to int?

DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
} else {
// In Spark 1.5.0, we store the data as number of days since epoch in string.
// So, we just convert it to Int.
stringValue.toInt
}

case (VALUE_STRING, TimestampType) =>
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
DateTimeUtils.stringToTime(parser.getText).getTime * 1000L

case (VALUE_NUMBER_INT, TimestampType) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.fasterxml.jackson.core.JsonFactory
import org.apache.spark.rdd.RDD
import org.scalactic.Tolerance._

import org.apache.spark.sql.{QueryTest, Row, SQLConf}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType
Expand Down Expand Up @@ -1159,4 +1159,105 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
"SELECT count(a) FROM test_myjson_with_part where d1 = 1"), Row(9))
})
}

test("backward compatibility") {
// This test we make sure our JSON support can read JSON data generated by previous version
// of Spark generated through toJSON method and JSON data source.
// The data is generated by the following program.
// Here are a few notes:
// - Spark 1.5.0 cannot save timestamp data. So, we manually added timestamp field (col13)
// in the JSON object.
// - For Spark before 1.5.1, we do not generate UDTs. So, we manually added the UDT value to
// JSON objects generated by those Spark versions (col17).
// - If the type is NullType, we do not write data out.

// Create the schema.
val struct =
StructType(
StructField("f1", FloatType, true) ::
StructField("f2", ArrayType(BooleanType), true) :: Nil)

val dataTypes =
Seq(
StringType, BinaryType, NullType, BooleanType,
ByteType, ShortType, IntegerType, LongType,
FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
DateType, TimestampType,
ArrayType(IntegerType), MapType(StringType, LongType), struct,
new MyDenseVectorUDT())
val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
StructField(s"col$index", dataType, nullable = true)
}
val schema = StructType(fields)

val constantValues =
Seq(
"a string in binary".getBytes("UTF-8"),
null,
true,
1.toByte,
2.toShort,
3,
Long.MaxValue,
0.25.toFloat,
0.75,
new java.math.BigDecimal(s"1234.23456"),
new java.math.BigDecimal(s"1.23456"),
java.sql.Date.valueOf("2015-01-01"),
java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"),
Seq(2, 3, 4),
Map("a string" -> 2000L),
Row(4.75.toFloat, Seq(false, true)),
new MyDenseVector(Array(0.25, 2.25, 4.25)))
val data =
Row.fromSeq(Seq("Spark " + sqlContext.sparkContext.version) ++ constantValues) :: Nil

// Data generated by previous versions.
// scalastyle:off
val existingJSONData =
"""{"col0":"Spark 1.2.2","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
"""{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
"""{"col0":"Spark 1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
"""{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
"""{"col0":"Spark 1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
"""{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" ::
"""{"col0":"Spark 1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01 23:50:59.123","col14":[2,3,4],"col15":{"a string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}""" :: Nil
// scalastyle:on

// Generate data for the current version.
val df = sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(data, 1), schema)
withTempPath { path =>
df.write.format("json").mode("overwrite").save(path.getCanonicalPath)

// df.toJSON will convert internal rows to external rows first and then generate
// JSON objects. While, df.write.format("json") will write internal rows directly.
val allJSON =
existingJSONData ++
df.toJSON.collect() ++
sparkContext.textFile(path.getCanonicalPath).collect()

Utils.deleteRecursively(path)
sparkContext.parallelize(allJSON, 1).saveAsTextFile(path.getCanonicalPath)

// Read data back with the schema specified.
val col0Values =
Seq(
"Spark 1.2.2",
"Spark 1.3.1",
"Spark 1.3.1",
"Spark 1.4.1",
"Spark 1.4.1",
"Spark 1.5.0",
"Spark 1.5.0",
"Spark " + sqlContext.sparkContext.version,
"Spark " + sqlContext.sparkContext.version)
val expectedResult = col0Values.map { v =>
Row.fromSeq(Seq(v) ++ constantValues)
}
checkAnswer(
sqlContext.read.format("json").schema(schema).load(path.getCanonicalPath),
expectedResult
)
}
}
}