From 7466031632c5f1771cad3f3131bc1a3e52be173a Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 22 Dec 2020 11:37:59 +0900 Subject: [PATCH] [SPARK-32106][SQL] Implement script transform in sql/core ### What changes were proposed in this pull request? * Implement `SparkScriptTransformationExec` based on `BaseScriptTransformationExec` * Implement `SparkScriptTransformationWriterThread` based on `BaseScriptTransformationWriterThread` of writing data * Add rule `SparkScripts` to support convert script LogicalPlan to SparkPlan in Spark SQL (without hive mode) * Add `SparkScriptTransformationSuite` test spark spec case * add test in `SQLQueryTestSuite` And we will close #29085 . ### Why are the changes needed? Support user use Script Transform without Hive ### Does this PR introduce _any_ user-facing change? User can use Script Transformation without hive in no serde mode. Such as : **default no serde ** ``` SELECT TRANSFORM(a, b, c) USING 'cat' AS (a int, b string, c long) FROM testData ``` **no serde with spec ROW FORMAT DELIMITED** ``` SELECT TRANSFORM(a, b, c) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' COLLECTION ITEMS TERMINATED BY '\u0002' MAP KEYS TERMINATED BY '\u0003' LINES TERMINATED BY '\n' NULL DEFINED AS 'null' USING 'cat' AS (a, b, c) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' COLLECTION ITEMS TERMINATED BY '\u0004' MAP KEYS TERMINATED BY '\u0005' LINES TERMINATED BY '\n' NULL DEFINED AS 'NULL' FROM testData ``` ### How was this patch tested? Added UT Closes #29414 from AngersZhuuuu/SPARK-32106-MINOR. Authored-by: angerszhu Signed-off-by: Takeshi Yamamuro --- .../sql/catalyst/parser/AstBuilder.scala | 52 ++- .../sql/catalyst/parser/PlanParserSuite.scala | 113 +++++- .../spark/sql/execution/SparkPlanner.scala | 1 + .../SparkScriptTransformationExec.scala | 91 +++++ .../spark/sql/execution/SparkSqlParser.scala | 115 +++--- .../spark/sql/execution/SparkStrategies.scala | 14 + .../resources/sql-tests/inputs/transform.sql | 195 ++++++++++ .../sql-tests/results/transform.sql.out | 357 ++++++++++++++++++ .../apache/spark/sql/SQLQueryTestSuite.scala | 5 +- .../SparkScriptTransformationSuite.scala | 102 +++++ .../HiveScriptTransformationExec.scala | 2 + 11 files changed, 982 insertions(+), 65 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/SparkScriptTransformationExec.scala create mode 100644 sql/core/src/test/resources/sql-tests/inputs/transform.sql create mode 100644 sql/core/src/test/resources/sql-tests/results/transform.sql.out create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 9c265544f3227..2af84fa079d97 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -743,8 +743,33 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg selectClause.hints.asScala.foldRight(withWindow)(withHints) } + // Script Transform's input/output format. + type ScriptIOFormat = + (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) + + protected def getRowFormatDelimited(ctx: RowFormatDelimitedContext): ScriptIOFormat = { + // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema + // expects a seq of pairs in which the old parsers' token names are used as keys. + // Transforming the result of visitRowFormatDelimited would be quite a bit messier than + // retrieving the key value pairs ourselves. + val entries = entry("TOK_TABLEROWFORMATFIELD", ctx.fieldsTerminatedBy) ++ + entry("TOK_TABLEROWFORMATCOLLITEMS", ctx.collectionItemsTerminatedBy) ++ + entry("TOK_TABLEROWFORMATMAPKEYS", ctx.keysTerminatedBy) ++ + entry("TOK_TABLEROWFORMATNULL", ctx.nullDefinedAs) ++ + Option(ctx.linesSeparatedBy).toSeq.map { token => + val value = string(token) + validate( + value == "\n", + s"LINES TERMINATED BY only supports newline '\\n' right now: $value", + ctx) + "TOK_TABLEROWFORMATLINES" -> value + } + + (entries, None, Seq.empty, None) + } + /** - * Create a (Hive based) [[ScriptInputOutputSchema]]. + * Create a [[ScriptInputOutputSchema]]. */ protected def withScriptIOSchema( ctx: ParserRuleContext, @@ -753,7 +778,30 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg outRowFormat: RowFormatContext, recordReader: Token, schemaLess: Boolean): ScriptInputOutputSchema = { - throw new ParseException("Script Transform is not supported", ctx) + + def format(fmt: RowFormatContext): ScriptIOFormat = fmt match { + case c: RowFormatDelimitedContext => + getRowFormatDelimited(c) + + case c: RowFormatSerdeContext => + throw new ParseException("TRANSFORM with serde is only supported in hive mode", ctx) + + // SPARK-32106: When there is no definition about format, we return empty result + // to use a built-in default Serde in SparkScriptTransformationExec. + case null => + (Nil, None, Seq.empty, None) + } + + val (inFormat, inSerdeClass, inSerdeProps, reader) = format(inRowFormat) + + val (outFormat, outSerdeClass, outSerdeProps, writer) = format(outRowFormat) + + ScriptInputOutputSchema( + inFormat, outFormat, + inSerdeClass, outSerdeClass, + inSerdeProps, outSerdeProps, + reader, writer, + schemaLess) } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 6fef18babedb6..54018198f619d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.IntegerType +import org.apache.spark.sql.types.{IntegerType, LongType, StringType} /** * Parser test cases for rules defined in [[CatalystSqlParser]] / [[AstBuilder]]. @@ -1031,4 +1031,115 @@ class PlanParserSuite extends AnalysisTest { assertEqual("select a, b from db.c;;;", table("db", "c").select('a, 'b)) assertEqual("select a, b from db.c; ;; ;", table("db", "c").select('a, 'b)) } + + test("SPARK-32106: TRANSFORM plan") { + // verify schema less + assertEqual( + """ + |SELECT TRANSFORM(a, b, c) + |USING 'cat' + |FROM testData + """.stripMargin, + ScriptTransformation( + Seq('a, 'b, 'c), + "cat", + Seq(AttributeReference("key", StringType)(), + AttributeReference("value", StringType)()), + UnresolvedRelation(TableIdentifier("testData")), + ScriptInputOutputSchema(List.empty, List.empty, None, None, + List.empty, List.empty, None, None, true)) + ) + + // verify without output schema + assertEqual( + """ + |SELECT TRANSFORM(a, b, c) + |USING 'cat' AS (a, b, c) + |FROM testData + """.stripMargin, + ScriptTransformation( + Seq('a, 'b, 'c), + "cat", + Seq(AttributeReference("a", StringType)(), + AttributeReference("b", StringType)(), + AttributeReference("c", StringType)()), + UnresolvedRelation(TableIdentifier("testData")), + ScriptInputOutputSchema(List.empty, List.empty, None, None, + List.empty, List.empty, None, None, false))) + + // verify with output schema + assertEqual( + """ + |SELECT TRANSFORM(a, b, c) + |USING 'cat' AS (a int, b string, c long) + |FROM testData + """.stripMargin, + ScriptTransformation( + Seq('a, 'b, 'c), + "cat", + Seq(AttributeReference("a", IntegerType)(), + AttributeReference("b", StringType)(), + AttributeReference("c", LongType)()), + UnresolvedRelation(TableIdentifier("testData")), + ScriptInputOutputSchema(List.empty, List.empty, None, None, + List.empty, List.empty, None, None, false))) + + // verify with ROW FORMAT DELIMETED + assertEqual( + """ + |SELECT TRANSFORM(a, b, c) + | ROW FORMAT DELIMITED + | FIELDS TERMINATED BY '\t' + | COLLECTION ITEMS TERMINATED BY '\u0002' + | MAP KEYS TERMINATED BY '\u0003' + | LINES TERMINATED BY '\n' + | NULL DEFINED AS 'null' + | USING 'cat' AS (a, b, c) + | ROW FORMAT DELIMITED + | FIELDS TERMINATED BY '\t' + | COLLECTION ITEMS TERMINATED BY '\u0004' + | MAP KEYS TERMINATED BY '\u0005' + | LINES TERMINATED BY '\n' + | NULL DEFINED AS 'NULL' + |FROM testData + """.stripMargin, + ScriptTransformation( + Seq('a, 'b, 'c), + "cat", + Seq(AttributeReference("a", StringType)(), + AttributeReference("b", StringType)(), + AttributeReference("c", StringType)()), + UnresolvedRelation(TableIdentifier("testData")), + ScriptInputOutputSchema( + Seq(("TOK_TABLEROWFORMATFIELD", "\t"), + ("TOK_TABLEROWFORMATCOLLITEMS", "\u0002"), + ("TOK_TABLEROWFORMATMAPKEYS", "\u0003"), + ("TOK_TABLEROWFORMATNULL", "null"), + ("TOK_TABLEROWFORMATLINES", "\n")), + Seq(("TOK_TABLEROWFORMATFIELD", "\t"), + ("TOK_TABLEROWFORMATCOLLITEMS", "\u0004"), + ("TOK_TABLEROWFORMATMAPKEYS", "\u0005"), + ("TOK_TABLEROWFORMATNULL", "NULL"), + ("TOK_TABLEROWFORMATLINES", "\n")), None, None, + List.empty, List.empty, None, None, false))) + + // verify with ROW FORMAT SERDE + intercept( + """ + |SELECT TRANSFORM(a, b, c) + | ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' + | WITH SERDEPROPERTIES( + | "separatorChar" = "\t", + | "quoteChar" = "'", + | "escapeChar" = "\\") + | USING 'cat' AS (a, b, c) + | ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' + | WITH SERDEPROPERTIES( + | "separatorChar" = "\t", + | "quoteChar" = "'", + | "escapeChar" = "\\") + |FROM testData + """.stripMargin, + "TRANSFORM with serde is only supported in hive mode") + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index c88fcecc9983b..6994aaf47dfba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -43,6 +43,7 @@ class SparkPlanner(val session: SparkSession, val experimentalMethods: Experimen Window :: JoinSelection :: InMemoryScans :: + SparkScripts :: BasicOperators :: Nil) /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkScriptTransformationExec.scala new file mode 100644 index 0000000000000..75c91667012a3 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkScriptTransformationExec.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.io._ + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.TaskContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ +import org.apache.spark.util.CircularBuffer + +/** + * Transforms the input by forking and running the specified script. + * + * @param input the set of expression that should be passed to the script. + * @param script the command that should be executed. + * @param output the attributes that are produced by the script. + * @param child logical plan whose output is transformed. + * @param ioschema the class set that defines how to handle input/output data. + */ +case class SparkScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema) + extends BaseScriptTransformationExec { + + override def processIterator( + inputIterator: Iterator[InternalRow], + hadoopConf: Configuration): Iterator[InternalRow] = { + + val (outputStream, proc, inputStream, stderrBuffer) = initProc + + val outputProjection = new InterpretedProjection(inputExpressionsWithoutSerde, child.output) + + // This new thread will consume the ScriptTransformation's input rows and write them to the + // external process. That process's output will be read by this current thread. + val writerThread = SparkScriptTransformationWriterThread( + inputIterator.map(outputProjection), + inputExpressionsWithoutSerde.map(_.dataType), + ioschema, + outputStream, + proc, + stderrBuffer, + TaskContext.get(), + hadoopConf + ) + + val outputIterator = + createOutputIteratorWithoutSerde(writerThread, inputStream, proc, stderrBuffer) + + writerThread.start() + + outputIterator + } +} + +case class SparkScriptTransformationWriterThread( + iter: Iterator[InternalRow], + inputSchema: Seq[DataType], + ioSchema: ScriptTransformationIOSchema, + outputStream: OutputStream, + proc: Process, + stderrBuffer: CircularBuffer, + taskContext: TaskContext, + conf: Configuration) + extends BaseScriptTransformationWriterThread { + + override def processRows(): Unit = { + processRowsWithoutSerde() + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 722ca6f992064..e530b4c9407a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution} +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION /** * Concrete parser for Spark SQL statements. @@ -478,70 +479,62 @@ class SparkSqlAstBuilder extends AstBuilder { "Unsupported operation: Used defined record reader/writer classes.", ctx) } - // Decode and input/output format. - type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) - def format( - fmt: RowFormatContext, - configKey: String, - defaultConfigValue: String): Format = fmt match { - case c: RowFormatDelimitedContext => - // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema - // expects a seq of pairs in which the old parsers' token names are used as keys. - // Transforming the result of visitRowFormatDelimited would be quite a bit messier than - // retrieving the key value pairs ourselves. - val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++ - entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++ - entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++ - entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs) ++ - Option(c.linesSeparatedBy).toSeq.map { token => - val value = string(token) - validate( - value == "\n", - s"LINES TERMINATED BY only supports newline '\\n' right now: $value", - c) - "TOK_TABLEROWFORMATLINES" -> value + if (!conf.getConf(CATALOG_IMPLEMENTATION).equals("hive")) { + super.withScriptIOSchema( + ctx, + inRowFormat, + recordWriter, + outRowFormat, + recordReader, + schemaLess) + } else { + def format( + fmt: RowFormatContext, + configKey: String, + defaultConfigValue: String): ScriptIOFormat = fmt match { + case c: RowFormatDelimitedContext => + getRowFormatDelimited(c) + + case c: RowFormatSerdeContext => + // Use a serde format. + val SerdeInfo(None, None, Some(name), props) = visitRowFormatSerde(c) + + // SPARK-10310: Special cases LazySimpleSerDe + val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { + Option(conf.getConfString(configKey, defaultConfigValue)) + } else { + None } + (Seq.empty, Option(name), props.toSeq, recordHandler) + + case null => + // Use default (serde) format. + val name = conf.getConfString("hive.script.serde", + "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") + val props = Seq( + "field.delim" -> "\t", + "serialization.last.column.takes.rest" -> "true") + val recordHandler = Option(conf.getConfString(configKey, defaultConfigValue)) + (Nil, Option(name), props, recordHandler) + } - (entries, None, Seq.empty, None) - - case c: RowFormatSerdeContext => - // Use a serde format. - val SerdeInfo(None, None, Some(name), props) = visitRowFormatSerde(c) - - // SPARK-10310: Special cases LazySimpleSerDe - val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { - Option(conf.getConfString(configKey, defaultConfigValue)) - } else { - None - } - (Seq.empty, Option(name), props.toSeq, recordHandler) - - case null => - // Use default (serde) format. - val name = conf.getConfString("hive.script.serde", - "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") - val props = Seq( - "field.delim" -> "\t", - "serialization.last.column.takes.rest" -> "true") - val recordHandler = Option(conf.getConfString(configKey, defaultConfigValue)) - (Nil, Option(name), props, recordHandler) + val (inFormat, inSerdeClass, inSerdeProps, reader) = + format( + inRowFormat, "hive.script.recordreader", + "org.apache.hadoop.hive.ql.exec.TextRecordReader") + + val (outFormat, outSerdeClass, outSerdeProps, writer) = + format( + outRowFormat, "hive.script.recordwriter", + "org.apache.hadoop.hive.ql.exec.TextRecordWriter") + + ScriptInputOutputSchema( + inFormat, outFormat, + inSerdeClass, outSerdeClass, + inSerdeProps, outSerdeProps, + reader, writer, + schemaLess) } - - val (inFormat, inSerdeClass, inSerdeProps, reader) = - format( - inRowFormat, "hive.script.recordreader", "org.apache.hadoop.hive.ql.exec.TextRecordReader") - - val (outFormat, outSerdeClass, outSerdeProps, writer) = - format( - outRowFormat, "hive.script.recordwriter", - "org.apache.hadoop.hive.ql.exec.TextRecordWriter") - - ScriptInputOutputSchema( - inFormat, outFormat, - inSerdeClass, outSerdeClass, - inSerdeProps, outSerdeProps, - reader, writer, - schemaLess) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index f5f77b03c2b1b..a8d788f59d271 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -594,6 +594,20 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } + object SparkScripts extends Strategy { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case logical.ScriptTransformation(input, script, output, child, ioschema) => + SparkScriptTransformationExec( + input, + script, + output, + planLater(child), + ScriptTransformationIOSchema(ioschema) + ) :: Nil + case _ => Nil + } + } + object BasicOperators extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case d: DataWritingCommand => DataWritingCommandExec(d, planLater(d.query)) :: Nil diff --git a/sql/core/src/test/resources/sql-tests/inputs/transform.sql b/sql/core/src/test/resources/sql-tests/inputs/transform.sql new file mode 100644 index 0000000000000..65b060eca3a62 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/transform.sql @@ -0,0 +1,195 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), 1, smallint(100), bigint(1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), 2, smallint(200), bigint(2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), 3, smallint(300), bigint(3), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) +AS t(a, b, c, d, e, f, g, h, i, j, k, l); + +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t; + +-- with non-exist command +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t; + +-- with non-exist file +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t; + +-- common supported data types between no serde and serde transform +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + USING 'cat' AS ( + a string, + b boolean, + c binary, + d tinyint, + e int, + f smallint, + g long, + h float, + i double, + j decimal(38, 18), + k timestamp, + l date) + FROM t +) tmp; + +-- common supported data types between no serde and serde transform +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + USING 'cat' AS ( + a string, + b string, + c string, + d string, + e string, + f string, + g string, + h string, + i string, + j string, + k string, + l string) + FROM t +) tmp; + +-- SPARK-32388 handle schema less +SELECT TRANSFORM(a) +USING 'cat' +FROM t; + +SELECT TRANSFORM(a, b) +USING 'cat' +FROM t; + +SELECT TRANSFORM(a, b, c) +USING 'cat' +FROM t; + +-- return null when return string incompatible (no serde) +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i) +USING 'cat' AS (a int, b short, c long, d byte, e float, f double, g decimal(38, 18), h date, i timestamp) +FROM VALUES +('a','','1231a','a','213.21a','213.21a','0a.21d','2000-04-01123','1997-0102 00:00:') tmp(a, b, c, d, e, f, g, h, i); + +-- SPARK-28227: transform can't run with aggregation +SELECT TRANSFORM(b, max(a), sum(f)) +USING 'cat' AS (a, b) +FROM t +GROUP BY b; + +-- transform use MAP +MAP a, b USING 'cat' AS (a, b) FROM t; + +-- transform use REDUCE +REDUCE a, b USING 'cat' AS (a, b) FROM t; + +-- transform with defined row format delimit +SELECT TRANSFORM(a, b, c, null) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY '@' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' +USING 'cat' AS (a, b, c, d) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY '@' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' +FROM t; + +SELECT TRANSFORM(a, b, c, null) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY '@' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' +USING 'cat' AS (d) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY '@' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' +FROM t; + +-- transform with defined row format delimit handle schema with correct type +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' + USING 'cat' AS ( + a string, + b boolean, + c binary, + d tinyint, + e int, + f smallint, + g long, + h float, + i double, + j decimal(38, 18), + k timestamp, + l date) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' + FROM t +) tmp; + +-- transform with defined row format delimit handle schema with wrong type +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' + USING 'cat' AS ( + a string, + b long, + c binary, + d tinyint, + e int, + f smallint, + g long, + h float, + i double, + j decimal(38, 18), + k int, + l long) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' + FROM t +) tmp; + +-- transform with defined row format delimit LINE TERMINATED BY only support '\n' +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '@' + NULL DEFINED AS 'NULL' + USING 'cat' AS ( + a string, + b string, + c string, + d string, + e string, + f string, + g string, + h string, + i string, + j string, + k string, + l string) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '@' + NULL DEFINED AS 'NULL' + FROM t +) tmp; diff --git a/sql/core/src/test/resources/sql-tests/results/transform.sql.out b/sql/core/src/test/resources/sql-tests/results/transform.sql.out new file mode 100644 index 0000000000000..83ab5cb729c24 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/transform.sql.out @@ -0,0 +1,357 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 18 + + +-- !query +CREATE OR REPLACE TEMPORARY VIEW t AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), 1, smallint(100), bigint(1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), 2, smallint(200), bigint(2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), 3, smallint(300), bigint(3), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) +AS t(a, b, c, d, e, f, g, h, i, j, k, l) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t +-- !query schema +struct +-- !query output +1 +2 +3 + + +-- !query +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkException +Subprocess exited with status 127. Error: /bin/bash: some_non_existent_command: command not found + + +-- !query +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkException +Subprocess exited with status 2. Error: python: can't open file 'some_non_existent_file': [Errno 2] No such file or directory + + +-- !query +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + USING 'cat' AS ( + a string, + b boolean, + c binary, + d tinyint, + e int, + f smallint, + g long, + h float, + i double, + j decimal(38, 18), + k timestamp, + l date) + FROM t +) tmp +-- !query schema +struct +-- !query output +1 true Spark SQL 1 1 100 1 1.0 1.0 1.000000000000000000 1997-01-02 00:00:00 2000-04-01 +2 false Spark SQL 2 2 200 2 2.0 2.0 2.000000000000000000 1997-01-02 03:04:05 2000-04-02 +3 true Spark SQL 3 3 300 3 3.0 3.0 3.000000000000000000 1997-02-10 17:32:01 2000-04-03 + + +-- !query +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + USING 'cat' AS ( + a string, + b string, + c string, + d string, + e string, + f string, + g string, + h string, + i string, + j string, + k string, + l string) + FROM t +) tmp +-- !query schema +struct +-- !query output +1 true Spark SQL 1 1 100 1 1.0 1.0 1 1997-01-02 00:00:00 2000-04-01 +2 false Spark SQL 2 2 200 2 2.0 2.0 2 1997-01-02 03:04:05 2000-04-02 +3 true Spark SQL 3 3 300 3 3.0 3.0 3 1997-02-10 17:32:01 2000-04-03 + + +-- !query +SELECT TRANSFORM(a) +USING 'cat' +FROM t +-- !query schema +struct +-- !query output +1 NULL +2 NULL +3 NULL + + +-- !query +SELECT TRANSFORM(a, b) +USING 'cat' +FROM t +-- !query schema +struct +-- !query output +1 true +2 false +3 true + + +-- !query +SELECT TRANSFORM(a, b, c) +USING 'cat' +FROM t +-- !query schema +struct +-- !query output +1 true +2 false +3 true + + +-- !query +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i) +USING 'cat' AS (a int, b short, c long, d byte, e float, f double, g decimal(38, 18), h date, i timestamp) +FROM VALUES +('a','','1231a','a','213.21a','213.21a','0a.21d','2000-04-01123','1997-0102 00:00:') tmp(a, b, c, d, e, f, g, h, i) +-- !query schema +struct +-- !query output +NULL NULL NULL NULL NULL NULL NULL NULL NULL + + +-- !query +SELECT TRANSFORM(b, max(a), sum(f)) +USING 'cat' AS (a, b) +FROM t +GROUP BY b +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException + +mismatched input 'GROUP' expecting {, ';'}(line 4, pos 0) + +== SQL == +SELECT TRANSFORM(b, max(a), sum(f)) +USING 'cat' AS (a, b) +FROM t +GROUP BY b +^^^ + + +-- !query +MAP a, b USING 'cat' AS (a, b) FROM t +-- !query schema +struct +-- !query output +1 true +2 false +3 true + + +-- !query +REDUCE a, b USING 'cat' AS (a, b) FROM t +-- !query schema +struct +-- !query output +1 true +2 false +3 true + + +-- !query +SELECT TRANSFORM(a, b, c, null) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY '@' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' +USING 'cat' AS (a, b, c, d) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY '@' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' +FROM t +-- !query schema +struct +-- !query output +1 true Spark SQL null +2 false Spark SQL null +3 true Spark SQL null + + +-- !query +SELECT TRANSFORM(a, b, c, null) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY '@' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' +USING 'cat' AS (d) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY '@' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' +FROM t +-- !query schema +struct +-- !query output +1 +2 +3 + + +-- !query +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' + USING 'cat' AS ( + a string, + b boolean, + c binary, + d tinyint, + e int, + f smallint, + g long, + h float, + i double, + j decimal(38, 18), + k timestamp, + l date) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' + FROM t +) tmp +-- !query schema +struct +-- !query output +1 true Spark SQL 1 1 100 1 1.0 1.0 1.000000000000000000 1997-01-02 00:00:00 2000-04-01 +2 false Spark SQL 2 2 200 2 2.0 2.0 2.000000000000000000 1997-01-02 03:04:05 2000-04-02 +3 true Spark SQL 3 3 300 3 3.0 3.0 3.000000000000000000 1997-02-10 17:32:01 2000-04-03 + + +-- !query +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' + USING 'cat' AS ( + a string, + b long, + c binary, + d tinyint, + e int, + f smallint, + g long, + h float, + i double, + j decimal(38, 18), + k int, + l long) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '\n' + NULL DEFINED AS 'NULL' + FROM t +) tmp +-- !query schema +struct +-- !query output +1 NULL Spark SQL 1 1 100 1 1.0 1.0 1.000000000000000000 NULL NULL +2 NULL Spark SQL 2 2 200 2 2.0 2.0 2.000000000000000000 NULL NULL +3 NULL Spark SQL 3 3 300 3 3.0 3.0 3.000000000000000000 NULL NULL + + +-- !query +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '@' + NULL DEFINED AS 'NULL' + USING 'cat' AS ( + a string, + b string, + c string, + d string, + e string, + f string, + g string, + h string, + i string, + j string, + k string, + l string) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '@' + NULL DEFINED AS 'NULL' + FROM t +) tmp +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException + +LINES TERMINATED BY only supports newline '\n' right now: @(line 3, pos 4) + +== SQL == +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + ROW FORMAT DELIMITED +----^^^ + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '@' + NULL DEFINED AS 'NULL' + USING 'cat' AS ( + a string, + b string, + c string, + d string, + e string, + f string, + g string, + h string, + i string, + j string, + k string, + l string) + ROW FORMAT DELIMITED + FIELDS TERMINATED BY ',' + LINES TERMINATED BY '@' + NULL DEFINED AS 'NULL' + FROM t +) tmp diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 02c6fba9725d3..eb2caa61e1590 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -24,7 +24,7 @@ import java.util.Locale import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.{SparkConf, SparkException, TestUtils} import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.SQLHelper @@ -260,6 +260,9 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper newLine.startsWith("--") && !newLine.startsWith("--QUERY-DELIMITER") } + // SPARK-32106 Since we add SQL test 'transform.sql' will use `cat` command, + // here we need to check command available + assume(TestUtils.testCommandAvailable("/bin/bash")) val input = fileToString(new File(testCase.inputFile)) val (comments, code) = splitCommentsAndCodes(input) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala new file mode 100644 index 0000000000000..6ff7c5d6d2f3a --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.{SparkException, TestUtils} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.test.SharedSparkSession + +class SparkScriptTransformationSuite extends BaseScriptTransformationSuite with SharedSparkSession { + import testImplicits._ + + override def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec = { + SparkScriptTransformationExec( + input = input, + script = script, + output = output, + child = child, + ioschema = ioschema + ) + } + + test("SPARK-32106: TRANSFORM with serde without hive should throw exception") { + assume(TestUtils.testCommandAvailable("/bin/bash")) + withTempView("v") { + val df = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") + df.createTempView("v") + + val e = intercept[ParseException] { + sql( + """ + |SELECT TRANSFORM (a) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' + |USING 'cat' AS (a) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' + |FROM v + """.stripMargin) + }.getMessage + assert(e.contains("TRANSFORM with serde is only supported in hive mode")) + } + } + + test("SPARK-32106: TRANSFORM doesn't support ArrayType/MapType/StructType " + + "as output data type (no serde)") { + assume(TestUtils.testCommandAvailable("/bin/bash")) + // check for ArrayType + val e1 = intercept[SparkException] { + sql( + """ + |SELECT TRANSFORM(a) + |USING 'cat' AS (a array) + |FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c) + """.stripMargin).collect() + }.getMessage + assert(e1.contains("SparkScriptTransformation without serde does not support" + + " ArrayType as output data type")) + + // check for MapType + val e2 = intercept[SparkException] { + sql( + """ + |SELECT TRANSFORM(b) + |USING 'cat' AS (b map) + |FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c) + """.stripMargin).collect() + }.getMessage + assert(e2.contains("SparkScriptTransformation without serde does not support" + + " MapType as output data type")) + + // check for StructType + val e3 = intercept[SparkException] { + sql( + """ + |SELECT TRANSFORM(c) + |USING 'cat' AS (c struct) + |FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c) + """.stripMargin).collect() + }.getMessage + assert(e3.contains("SparkScriptTransformation without serde does not support" + + " StructType as output data type")) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala index 26baff3d83eec..4b03cff5e8c8e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala @@ -45,6 +45,8 @@ import org.apache.spark.util.{CircularBuffer, Utils} * @param input the set of expression that should be passed to the script. * @param script the command that should be executed. * @param output the attributes that are produced by the script. + * @param child logical plan whose output is transformed. + * @param ioschema the class set that defines how to handle input/output data. */ case class HiveScriptTransformationExec( input: Seq[Expression],