Skip to content

[SPARK-31020][SPARK-31023][SPARK-31025][SPARK-31044][SQL] Support foldable args by from_csv/json and schema_of_csv/json #27804

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 17 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,29 @@ import org.apache.spark.unsafe.types.UTF8String

object ExprUtils {

def evalSchemaExpr(exp: Expression): StructType = {
// Use `DataType.fromDDL` since the type string can be struct<...>.
val dataType = exp match {
case Literal(s, StringType) =>
DataType.fromDDL(s.toString)
case e @ SchemaOfCsv(_: Literal, _) =>
val ddlSchema = e.eval(EmptyRow).asInstanceOf[UTF8String]
DataType.fromDDL(ddlSchema.toString)
case e => throw new AnalysisException(
def evalTypeExpr(exp: Expression): DataType = {
if (exp.foldable) {
exp.eval() match {
case s: UTF8String if s != null => DataType.fromDDL(s.toString)
case _ => throw new AnalysisException(
s"The expression '${exp.sql}' is not a valid schema string.")
}
} else {
throw new AnalysisException(
"Schema should be specified in DDL format as a string literal or output of " +
s"the schema_of_csv function instead of ${e.sql}")
s"the schema_of_json/schema_of_csv functions instead of ${exp.sql}")
}
}

def evalSchemaExpr(exp: Expression): StructType = {
val dataType = evalTypeExpr(exp)
if (!dataType.isInstanceOf[StructType]) {
throw new AnalysisException(
s"Schema should be struct type but got ${dataType.sql}.")
}
dataType.asInstanceOf[StructType]
}

def evalTypeExpr(exp: Expression): DataType = exp match {
case Literal(s, StringType) => DataType.fromDDL(s.toString)
case e @ SchemaOfJson(_: Literal, _) =>
val ddlSchema = e.eval(EmptyRow).asInstanceOf[UTF8String]
DataType.fromDDL(ddlSchema.toString)
case e => throw new AnalysisException(
"Schema should be specified in DDL format as a string literal or output of " +
s"the schema_of_json function instead of ${e.sql}")
}

def convertToMapData(exp: Expression): Map[String, String] = exp match {
case m: CreateMap
if m.dataType.acceptsType(MapType(StringType, StringType, valueContainsNull = false)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,14 @@ case class SchemaOfCsv(
@transient
private lazy val csv = child.eval().asInstanceOf[UTF8String]

override def checkInputDataTypes(): TypeCheckResult = child match {
case Literal(s, StringType) if s != null => super.checkInputDataTypes()
case _ => TypeCheckResult.TypeCheckFailure(
s"The input csv should be a string literal and not null; however, got ${child.sql}.")
override def checkInputDataTypes(): TypeCheckResult = {
if (child.foldable && csv != null) {
super.checkInputDataTypes()
} else {
TypeCheckResult.TypeCheckFailure(
"The input csv should be a foldable string expression and not null; " +
s"however, got ${child.sql}.")
}
}

override def eval(v: InternalRow): Any = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -764,10 +764,14 @@ case class SchemaOfJson(
@transient
private lazy val json = child.eval().asInstanceOf[UTF8String]

override def checkInputDataTypes(): TypeCheckResult = child match {
case Literal(s, StringType) if s != null => super.checkInputDataTypes()
case _ => TypeCheckResult.TypeCheckFailure(
s"The input json should be a string literal and not null; however, got ${child.sql}.")
override def checkInputDataTypes(): TypeCheckResult = {
if (child.foldable && json != null) {
super.checkInputDataTypes()
} else {
TypeCheckResult.TypeCheckFailure(
"The input json should be a foldable string expression and not null; " +
s"however, got ${child.sql}.")
}
}

override def eval(v: InternalRow): Any = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ select from_csv('1', 1)
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Schema should be specified in DDL format as a string literal or output of the schema_of_csv function instead of 1;; line 1 pos 7
The expression '1' is not a valid schema string.;; line 1 pos 7


-- !query
Expand Down Expand Up @@ -91,7 +91,7 @@ select schema_of_csv(null)
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
cannot resolve 'schema_of_csv(NULL)' due to data type mismatch: The input csv should be a string literal and not null; however, got NULL.; line 1 pos 7
cannot resolve 'schema_of_csv(NULL)' due to data type mismatch: The input csv should be a foldable string expression and not null; however, got NULL.; line 1 pos 7


-- !query
Expand All @@ -108,7 +108,7 @@ SELECT schema_of_csv(csvField) FROM csvTable
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
cannot resolve 'schema_of_csv(csvtable.`csvField`)' due to data type mismatch: The input csv should be a string literal and not null; however, got csvtable.`csvField`.; line 1 pos 7
cannot resolve 'schema_of_csv(csvtable.`csvField`)' due to data type mismatch: The input csv should be a foldable string expression and not null; however, got csvtable.`csvField`.; line 1 pos 7


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ select from_json('{"a":1}', 1)
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Schema should be specified in DDL format as a string literal or output of the schema_of_json function instead of 1;; line 1 pos 7
The expression '1' is not a valid schema string.;; line 1 pos 7


-- !query
Expand Down Expand Up @@ -326,7 +326,7 @@ select schema_of_json(null)
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
cannot resolve 'schema_of_json(NULL)' due to data type mismatch: The input json should be a string literal and not null; however, got NULL.; line 1 pos 7
cannot resolve 'schema_of_json(NULL)' due to data type mismatch: The input json should be a foldable string expression and not null; however, got NULL.; line 1 pos 7


-- !query
Expand All @@ -343,7 +343,7 @@ SELECT schema_of_json(jsonField) FROM jsonTable
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
cannot resolve 'schema_of_json(jsontable.`jsonField`)' due to data type mismatch: The input json should be a string literal and not null; however, got jsontable.`jsonField`.; line 1 pos 7
cannot resolve 'schema_of_json(jsontable.`jsonField`)' due to data type mismatch: The input json should be a foldable string expression and not null; however, got jsontable.`jsonField`.; line 1 pos 7


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,4 +200,30 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession {
assert(readback(0).getAs[Row](0).getAs[Date](0).getTime >= 0)
}
}

test("support foldable schema by from_csv") {
val options = Map[String, String]().asJava
val schema = concat_ws(",", lit("i int"), lit("s string"))
checkAnswer(
Seq("""1,"a"""").toDS().select(from_csv($"value", schema, options)),
Row(Row(1, "a")))

val errMsg = intercept[AnalysisException] {
Seq(("1", "i int")).toDF("csv", "schema")
.select(from_csv($"csv", $"schema", options)).collect()
}.getMessage
assert(errMsg.contains("Schema should be specified in DDL format as a string literal"))

val errMsg2 = intercept[AnalysisException] {
Seq("1").toDF("csv").select(from_csv($"csv", lit(1), options)).collect()
}.getMessage
assert(errMsg2.contains("The expression '1' is not a valid schema string"))
}

test("schema_of_csv - infers the schema of foldable CSV string") {
val input = concat_ws(",", lit(0.1), lit(1))
checkAnswer(
spark.range(1).select(schema_of_csv(input)),
Seq(Row("struct<_c0:double,_c1:int>")))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
val errMsg1 = intercept[AnalysisException] {
df3.selectExpr("from_json(value, 1)")
}
assert(errMsg1.getMessage.startsWith("Schema should be specified in DDL format as a string"))
assert(errMsg1.getMessage.startsWith("The expression '1' is not a valid schema string"))
val errMsg2 = intercept[AnalysisException] {
df3.selectExpr("""from_json(value, 'time InvalidType')""")
}
Expand Down Expand Up @@ -653,4 +653,25 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
assert(json_tuple_result === len)
}
}

test("support foldable schema by from_json") {
val options = Map[String, String]().asJava
val schema = regexp_replace(lit("dpt_org_id INT, dpt_org_city STRING"), "dpt_org_", "")
checkAnswer(
Seq("""{"id":1,"city":"Moscow"}""").toDS().select(from_json($"value", schema, options)),
Row(Row(1, "Moscow")))

val errMsg = intercept[AnalysisException] {
Seq(("""{"i":1}""", "i int")).toDF("json", "schema")
.select(from_json($"json", $"schema", options)).collect()
}.getMessage
assert(errMsg.contains("Schema should be specified in DDL format as a string literal"))
}

test("schema_of_json - infers the schema of foldable JSON string") {
val input = regexp_replace(lit("""{"item_id": 1, "item_price": 0.1}"""), "item_", "")
checkAnswer(
spark.range(1).select(schema_of_json(input)),
Seq(Row("struct<id:bigint,price:double>")))
}
}