From b4c4b408a23100abb59319902e9f09d39c8eaca2 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 12 Aug 2020 15:11:00 +0800 Subject: [PATCH] [SPARK-32106][SQL] Implement script transform in sql/core --- .../sql/catalyst/parser/AstBuilder.scala | 49 +++- .../sql/catalyst/parser/PlanParserSuite.scala | 113 ++++++++- .../spark/sql/execution/SparkPlanner.scala | 1 + .../SparkScriptTransformationExec.scala | 89 +++++++ .../spark/sql/execution/SparkSqlParser.scala | 109 +++++---- .../spark/sql/execution/SparkStrategies.scala | 14 ++ .../resources/sql-tests/inputs/transform.sql | 114 +++++++++ .../sql-tests/results/transform.sql.out | 224 ++++++++++++++++++ .../apache/spark/sql/SQLQueryTestSuite.scala | 5 +- .../SparkScriptTransformationSuite.scala | 104 ++++++++ 10 files changed, 762 insertions(+), 60 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/SparkScriptTransformationExec.scala create mode 100644 sql/core/src/test/resources/sql-tests/inputs/transform.sql create mode 100644 sql/core/src/test/resources/sql-tests/results/transform.sql.out create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index fe99a8ea3cc12..bdd9d4fad3e73 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -744,8 +744,30 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging selectClause.hints.asScala.foldRight(withWindow)(withHints) } + // Script Transform's input/output format. + type ScriptIOFormat = + (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) + + protected def getRowFormatDelimited(ctx: RowFormatDelimitedContext): ScriptIOFormat = { + // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema + // expects a seq of pairs in which the old parsers' token names are used as keys. + // Transforming the result of visitRowFormatDelimited would be quite a bit messier than + // retrieving the key value pairs ourselves. + def entry(key: String, value: Token): Seq[(String, String)] = { + Option(value).map(t => key -> t.getText).toSeq + } + + val entries = entry("TOK_TABLEROWFORMATFIELD", ctx.fieldsTerminatedBy) ++ + entry("TOK_TABLEROWFORMATCOLLITEMS", ctx.collectionItemsTerminatedBy) ++ + entry("TOK_TABLEROWFORMATMAPKEYS", ctx.keysTerminatedBy) ++ + entry("TOK_TABLEROWFORMATLINES", ctx.linesSeparatedBy) ++ + entry("TOK_TABLEROWFORMATNULL", ctx.nullDefinedAs) + + (entries, None, Seq.empty, None) + } + /** - * Create a (Hive based) [[ScriptInputOutputSchema]]. + * Create a [[ScriptInputOutputSchema]]. */ protected def withScriptIOSchema( ctx: ParserRuleContext, @@ -754,7 +776,30 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging outRowFormat: RowFormatContext, recordReader: Token, schemaLess: Boolean): ScriptInputOutputSchema = { - throw new ParseException("Script Transform is not supported", ctx) + + def format(fmt: RowFormatContext): ScriptIOFormat = fmt match { + case c: RowFormatDelimitedContext => + getRowFormatDelimited(c) + + case c: RowFormatSerdeContext => + throw new ParseException("TRANSFORM with serde is only supported in hive mode", ctx) + + // SPARK-32106: When there is no definition about format, we return empty result + // to use a built-in default Serde in SparkScriptTransformationExec. + case null => + (Nil, None, Seq.empty, None) + } + + val (inFormat, inSerdeClass, inSerdeProps, reader) = format(inRowFormat) + + val (outFormat, outSerdeClass, outSerdeProps, writer) = format(outRowFormat) + + ScriptInputOutputSchema( + inFormat, outFormat, + inSerdeClass, outSerdeClass, + inSerdeProps, outSerdeProps, + reader, writer, + schemaLess) } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 88afcb10d9c20..8819d24ce419b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.IntegerType +import org.apache.spark.sql.types.{IntegerType, LongType, StringType} /** * Parser test cases for rules defined in [[CatalystSqlParser]] / [[AstBuilder]]. @@ -1031,4 +1031,115 @@ class PlanParserSuite extends AnalysisTest { assertEqual("select a, b from db.c;;;", table("db", "c").select('a, 'b)) assertEqual("select a, b from db.c; ;; ;", table("db", "c").select('a, 'b)) } + + test("SPARK-32106: TRANSFORM plan") { + // verify schema less + assertEqual( + """ + |SELECT TRANSFORM(a, b, c) + |USING 'cat' + |FROM testData + """.stripMargin, + ScriptTransformation( + Seq('a, 'b, 'c), + "cat", + Seq(AttributeReference("key", StringType)(), + AttributeReference("value", StringType)()), + UnresolvedRelation(TableIdentifier("testData")), + ScriptInputOutputSchema(List.empty, List.empty, None, None, + List.empty, List.empty, None, None, true)) + ) + + // verify without output schema + assertEqual( + """ + |SELECT TRANSFORM(a, b, c) + |USING 'cat' AS (a, b, c) + |FROM testData + """.stripMargin, + ScriptTransformation( + Seq('a, 'b, 'c), + "cat", + Seq(AttributeReference("a", StringType)(), + AttributeReference("b", StringType)(), + AttributeReference("c", StringType)()), + UnresolvedRelation(TableIdentifier("testData")), + ScriptInputOutputSchema(List.empty, List.empty, None, None, + List.empty, List.empty, None, None, false))) + + // verify with output schema + assertEqual( + """ + |SELECT TRANSFORM(a, b, c) + |USING 'cat' AS (a int, b string, c long) + |FROM testData + """.stripMargin, + ScriptTransformation( + Seq('a, 'b, 'c), + "cat", + Seq(AttributeReference("a", IntegerType)(), + AttributeReference("b", StringType)(), + AttributeReference("c", LongType)()), + UnresolvedRelation(TableIdentifier("testData")), + ScriptInputOutputSchema(List.empty, List.empty, None, None, + List.empty, List.empty, None, None, false))) + + // verify with ROW FORMAT DELIMETED + assertEqual( + """ + |SELECT TRANSFORM(a, b, c) + |ROW FORMAT DELIMITED + |FIELDS TERMINATED BY '\t' + |COLLECTION ITEMS TERMINATED BY '\u0002' + |MAP KEYS TERMINATED BY '\u0003' + |LINES TERMINATED BY '\n' + |NULL DEFINED AS 'null' + |USING 'cat' AS (a, b, c) + |ROW FORMAT DELIMITED + |FIELDS TERMINATED BY '\t' + |COLLECTION ITEMS TERMINATED BY '\u0004' + |MAP KEYS TERMINATED BY '\u0005' + |LINES TERMINATED BY '\n' + |NULL DEFINED AS 'NULL' + |FROM testData + """.stripMargin, + ScriptTransformation( + Seq('a, 'b, 'c), + "cat", + Seq(AttributeReference("a", StringType)(), + AttributeReference("b", StringType)(), + AttributeReference("c", StringType)()), + UnresolvedRelation(TableIdentifier("testData")), + ScriptInputOutputSchema( + Seq(("TOK_TABLEROWFORMATFIELD", "'\\t'"), + ("TOK_TABLEROWFORMATCOLLITEMS", "'\u0002'"), + ("TOK_TABLEROWFORMATMAPKEYS", "'\u0003'"), + ("TOK_TABLEROWFORMATLINES", "'\\n'"), + ("TOK_TABLEROWFORMATNULL", "'null'")), + Seq(("TOK_TABLEROWFORMATFIELD", "'\\t'"), + ("TOK_TABLEROWFORMATCOLLITEMS", "'\u0004'"), + ("TOK_TABLEROWFORMATMAPKEYS", "'\u0005'"), + ("TOK_TABLEROWFORMATLINES", "'\\n'"), + ("TOK_TABLEROWFORMATNULL", "'NULL'")), None, None, + List.empty, List.empty, None, None, false))) + + // verify with ROW FORMAT SERDE + intercept( + """ + |SELECT TRANSFORM(a, b, c) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' + |WITH SERDEPROPERTIES( + | "separatorChar" = "\t", + | "quoteChar" = "'", + | "escapeChar" = "\\") + |USING 'cat' AS (a, b, c) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' + |WITH SERDEPROPERTIES( + | "separatorChar" = "\t", + | "quoteChar" = "'", + | "escapeChar" = "\\") + |FROM testData + """.stripMargin, + "TRANSFORM with serde is only supported in hive mode") + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index 895eeedd86b8b..8532bc77a70cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -46,6 +46,7 @@ class SparkPlanner( Window :: JoinSelection :: InMemoryScans :: + SparkScripts :: BasicOperators :: Nil) /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkScriptTransformationExec.scala new file mode 100644 index 0000000000000..b87c20e6a5656 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkScriptTransformationExec.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.io._ + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.TaskContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ +import org.apache.spark.util.CircularBuffer + +/** + * Transforms the input by forking and running the specified script. + * + * @param input the set of expression that should be passed to the script. + * @param script the command that should be executed. + * @param output the attributes that are produced by the script. + */ +case class SparkScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema) + extends BaseScriptTransformationExec { + + override def processIterator( + inputIterator: Iterator[InternalRow], + hadoopConf: Configuration): Iterator[InternalRow] = { + + val (outputStream, proc, inputStream, stderrBuffer) = initProc + + val outputProjection = new InterpretedProjection(inputExpressionsWithoutSerde, child.output) + + // This new thread will consume the ScriptTransformation's input rows and write them to the + // external process. That process's output will be read by this current thread. + val writerThread = SparkScriptTransformationWriterThread( + inputIterator.map(outputProjection), + inputExpressionsWithoutSerde.map(_.dataType), + ioschema, + outputStream, + proc, + stderrBuffer, + TaskContext.get(), + hadoopConf + ) + + val outputIterator = + createOutputIteratorWithoutSerde(writerThread, inputStream, proc, stderrBuffer) + + writerThread.start() + + outputIterator + } +} + +case class SparkScriptTransformationWriterThread( + iter: Iterator[InternalRow], + inputSchema: Seq[DataType], + ioSchema: ScriptTransformationIOSchema, + outputStream: OutputStream, + proc: Process, + stderrBuffer: CircularBuffer, + taskContext: TaskContext, + conf: Configuration) + extends BaseScriptTransformationWriterThread { + + override def processRows(): Unit = { + processRowsWithoutSerde() + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 129312160b1b7..3cddecc03f3c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution} +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.types.StructType /** @@ -745,64 +746,60 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { "Unsupported operation: Used defined record reader/writer classes.", ctx) } - // Decode and input/output format. - type Format = (Seq[(String, String)], Option[String], Seq[(String, String)], Option[String]) - def format( - fmt: RowFormatContext, - configKey: String, - defaultConfigValue: String): Format = fmt match { - case c: RowFormatDelimitedContext => - // TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema - // expects a seq of pairs in which the old parsers' token names are used as keys. - // Transforming the result of visitRowFormatDelimited would be quite a bit messier than - // retrieving the key value pairs ourselves. - def entry(key: String, value: Token): Seq[(String, String)] = { - Option(value).map(t => key -> t.getText).toSeq - } - val entries = entry("TOK_TABLEROWFORMATFIELD", c.fieldsTerminatedBy) ++ - entry("TOK_TABLEROWFORMATCOLLITEMS", c.collectionItemsTerminatedBy) ++ - entry("TOK_TABLEROWFORMATMAPKEYS", c.keysTerminatedBy) ++ - entry("TOK_TABLEROWFORMATLINES", c.linesSeparatedBy) ++ - entry("TOK_TABLEROWFORMATNULL", c.nullDefinedAs) - - (entries, None, Seq.empty, None) - - case c: RowFormatSerdeContext => - // Use a serde format. - val CatalogStorageFormat(None, None, None, Some(name), _, props) = visitRowFormatSerde(c) + if (!conf.getConf(CATALOG_IMPLEMENTATION).equals("hive")) { + super.withScriptIOSchema( + ctx, + inRowFormat, + recordWriter, + outRowFormat, + recordReader, + schemaLess) + } else { + def format( + fmt: RowFormatContext, + configKey: String, + defaultConfigValue: String): ScriptIOFormat = fmt match { + case c: RowFormatDelimitedContext => + getRowFormatDelimited(c) + + case c: RowFormatSerdeContext => + // Use a serde format. + val CatalogStorageFormat(None, None, None, Some(name), _, props) = visitRowFormatSerde(c) + + // SPARK-10310: Special cases LazySimpleSerDe + val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { + Option(conf.getConfString(configKey, defaultConfigValue)) + } else { + None + } + (Seq.empty, Option(name), props.toSeq, recordHandler) + + case null => + // Use default (serde) format. + val name = conf.getConfString("hive.script.serde", + "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") + val props = Seq("field.delim" -> "\t") + val recordHandler = Option(conf.getConfString(configKey, defaultConfigValue)) + (Nil, Option(name), props, recordHandler) + } - // SPARK-10310: Special cases LazySimpleSerDe - val recordHandler = if (name == "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") { - Option(conf.getConfString(configKey, defaultConfigValue)) - } else { - None - } - (Seq.empty, Option(name), props.toSeq, recordHandler) - - case null => - // Use default (serde) format. - val name = conf.getConfString("hive.script.serde", - "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") - val props = Seq("field.delim" -> "\t") - val recordHandler = Option(conf.getConfString(configKey, defaultConfigValue)) - (Nil, Option(name), props, recordHandler) + val (inFormat, inSerdeClass, inSerdeProps, reader) = + format( + inRowFormat, "hive.script.recordreader", + "org.apache.hadoop.hive.ql.exec.TextRecordReader") + + val (outFormat, outSerdeClass, outSerdeProps, writer) = + format( + outRowFormat, "hive.script.recordwriter", + "org.apache.hadoop.hive.ql.exec.TextRecordWriter") + + ScriptInputOutputSchema( + inFormat, outFormat, + inSerdeClass, outSerdeClass, + inSerdeProps, outSerdeProps, + reader, writer, + schemaLess) } - - val (inFormat, inSerdeClass, inSerdeProps, reader) = - format( - inRowFormat, "hive.script.recordreader", "org.apache.hadoop.hive.ql.exec.TextRecordReader") - - val (outFormat, outSerdeClass, outSerdeProps, writer) = - format( - outRowFormat, "hive.script.recordwriter", - "org.apache.hadoop.hive.ql.exec.TextRecordWriter") - - ScriptInputOutputSchema( - inFormat, outFormat, - inSerdeClass, outSerdeClass, - inSerdeProps, outSerdeProps, - reader, writer, - schemaLess) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index eb32bfcecae7b..ffc7e0803a075 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -589,6 +589,20 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } + object SparkScripts extends Strategy { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case logical.ScriptTransformation(input, script, output, child, ioschema) => + SparkScriptTransformationExec( + input, + script, + output, + planLater(child), + ScriptTransformationIOSchema(ioschema) + ) :: Nil + case _ => Nil + } + } + object BasicOperators extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case d: DataWritingCommand => DataWritingCommandExec(d, planLater(d.query)) :: Nil diff --git a/sql/core/src/test/resources/sql-tests/inputs/transform.sql b/sql/core/src/test/resources/sql-tests/inputs/transform.sql new file mode 100644 index 0000000000000..8610e384d6fab --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/transform.sql @@ -0,0 +1,114 @@ +-- Test data. +CREATE OR REPLACE TEMPORARY VIEW t AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), 1, smallint(100), bigint(1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), 2, smallint(200), bigint(2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), 3, smallint(300), bigint(3), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) +AS t(a, b, c, d, e, f, g, h, i, j, k, l); + +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t; + +-- with non-exist command +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t; + +-- with non-exist file +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t; + +-- common supported data types between no serde and serde transform +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + USING 'cat' AS ( + a string, + b boolean, + c binary, + d tinyint, + e int, + f smallint, + g long, + h float, + i double, + j decimal(38, 18), + k timestamp, + l date) + FROM t +) tmp; + +-- common supported data types between no serde and serde transform +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + USING 'cat' AS ( + a string, + b string, + c string, + d string, + e string, + f string, + g string, + h string, + i string, + j string, + k string, + l string) + FROM t +) tmp; + +-- SPARK-32388 handle schema less +SELECT TRANSFORM(a) +USING 'cat' +FROM t; + +SELECT TRANSFORM(a, b) +USING 'cat' +FROM t; + +SELECT TRANSFORM(a, b, c) +USING 'cat' +FROM t; + +-- return null when return string incompatible (no serde) +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i) +USING 'cat' AS (a int, b short, c long, d byte, e float, f double, g decimal(38, 18), h date, i timestamp) +FROM VALUES +('a','','1231a','a','213.21a','213.21a','0a.21d','2000-04-01123','1997-0102 00:00:') tmp(a, b, c, d, e, f, g, h, i); + +-- SPARK-28227: transform can't run with aggregation +SELECT TRANSFORM(b, max(a), sum(f)) +USING 'cat' AS (a, b) +FROM t +GROUP BY b; + +-- transform use MAP +MAP a, b USING 'cat' AS (a, b) FROM t; + +-- transform use REDUCE +REDUCE a, b USING 'cat' AS (a, b) FROM t; + +-- transform with defined row format delimit +SELECT TRANSFORM(a, b, c, null) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +NULL DEFINED AS 'NULL' +USING 'cat' AS (a, b, c, d) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +NULL DEFINED AS 'NULL' +FROM t; + +SELECT TRANSFORM(a, b, c, null) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +NULL DEFINED AS 'NULL' +USING 'cat' AS (d) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '||' +LINES TERMINATED BY '\n' +NULL DEFINED AS 'NULL' +FROM t; diff --git a/sql/core/src/test/resources/sql-tests/results/transform.sql.out b/sql/core/src/test/resources/sql-tests/results/transform.sql.out new file mode 100644 index 0000000000000..744d6384f9c45 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/transform.sql.out @@ -0,0 +1,224 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 15 + + +-- !query +CREATE OR REPLACE TEMPORARY VIEW t AS SELECT * FROM VALUES +('1', true, unhex('537061726B2053514C'), tinyint(1), 1, smallint(100), bigint(1), float(1.0), 1.0, Decimal(1.0), timestamp('1997-01-02'), date('2000-04-01')), +('2', false, unhex('537061726B2053514C'), tinyint(2), 2, smallint(200), bigint(2), float(2.0), 2.0, Decimal(2.0), timestamp('1997-01-02 03:04:05'), date('2000-04-02')), +('3', true, unhex('537061726B2053514C'), tinyint(3), 3, smallint(300), bigint(3), float(3.0), 3.0, Decimal(3.0), timestamp('1997-02-10 17:32:01-08'), date('2000-04-03')) +AS t(a, b, c, d, e, f, g, h, i, j, k, l) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT TRANSFORM(a) +USING 'cat' AS (a) +FROM t +-- !query schema +struct +-- !query output +1 +2 +3 + + +-- !query +SELECT TRANSFORM(a) +USING 'some_non_existent_command' AS (a) +FROM t +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkException +Subprocess exited with status 127. Error: /bin/bash: some_non_existent_command: command not found + + +-- !query +SELECT TRANSFORM(a) +USING 'python some_non_existent_file' AS (a) +FROM t +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkException +Subprocess exited with status 2. Error: python: can't open file 'some_non_existent_file': [Errno 2] No such file or directory + + +-- !query +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + USING 'cat' AS ( + a string, + b boolean, + c binary, + d tinyint, + e int, + f smallint, + g long, + h float, + i double, + j decimal(38, 18), + k timestamp, + l date) + FROM t +) tmp +-- !query schema +struct +-- !query output +1 true Spark SQL 1 1 100 1 1.0 1.0 1.000000000000000000 1997-01-02 00:00:00 2000-04-01 +2 false Spark SQL 2 2 200 2 2.0 2.0 2.000000000000000000 1997-01-02 03:04:05 2000-04-02 +3 true Spark SQL 3 3 300 3 3.0 3.0 3.000000000000000000 1997-02-10 17:32:01 2000-04-03 + + +-- !query +SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM ( + SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l) + USING 'cat' AS ( + a string, + b string, + c string, + d string, + e string, + f string, + g string, + h string, + i string, + j string, + k string, + l string) + FROM t +) tmp +-- !query schema +struct +-- !query output +1 true Spark SQL 1 1 100 1 1.0 1.0 1 1997-01-02 00:00:00 2000-04-01 +2 false Spark SQL 2 2 200 2 2.0 2.0 2 1997-01-02 03:04:05 2000-04-02 +3 true Spark SQL 3 3 300 3 3.0 3.0 3 1997-02-10 17:32:01 2000-04-03 + + +-- !query +SELECT TRANSFORM(a) +USING 'cat' +FROM t +-- !query schema +struct<> +-- !query output +java.lang.ArrayIndexOutOfBoundsException +1 + + +-- !query +SELECT TRANSFORM(a, b) +USING 'cat' +FROM t +-- !query schema +struct +-- !query output +1 true +2 false +3 true + + +-- !query +SELECT TRANSFORM(a, b, c) +USING 'cat' +FROM t +-- !query schema +struct +-- !query output +1 true +2 false +3 true + + +-- !query +SELECT TRANSFORM(a, b, c, d, e, f, g, h, i) +USING 'cat' AS (a int, b short, c long, d byte, e float, f double, g decimal(38, 18), h date, i timestamp) +FROM VALUES +('a','','1231a','a','213.21a','213.21a','0a.21d','2000-04-01123','1997-0102 00:00:') tmp(a, b, c, d, e, f, g, h, i) +-- !query schema +struct +-- !query output +NULL NULL NULL NULL NULL NULL NULL NULL NULL + + +-- !query +SELECT TRANSFORM(b, max(a), sum(f)) +USING 'cat' AS (a, b) +FROM t +GROUP BY b +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException + +mismatched input 'GROUP' expecting {, ';'}(line 4, pos 0) + +== SQL == +SELECT TRANSFORM(b, max(a), sum(f)) +USING 'cat' AS (a, b) +FROM t +GROUP BY b +^^^ + + +-- !query +MAP a, b USING 'cat' AS (a, b) FROM t +-- !query schema +struct +-- !query output +1 true +2 false +3 true + + +-- !query +REDUCE a, b USING 'cat' AS (a, b) FROM t +-- !query schema +struct +-- !query output +1 true +2 false +3 true + + +-- !query +SELECT TRANSFORM(a, b, c, null) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +NULL DEFINED AS 'NULL' +USING 'cat' AS (a, b, c, d) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +NULL DEFINED AS 'NULL' +FROM t +-- !query schema +struct +-- !query output +1 | true | +2 | false | + + +-- !query +SELECT TRANSFORM(a, b, c, null) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +LINES TERMINATED BY '\n' +NULL DEFINED AS 'NULL' +USING 'cat' AS (d) +ROW FORMAT DELIMITED +FIELDS TERMINATED BY '||' +LINES TERMINATED BY '\n' +NULL DEFINED AS 'NULL' +FROM t +-- !query schema +struct +-- !query output +1 +2 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 33247455b5cdf..56b0fa99740d0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -23,7 +23,7 @@ import java.util.Locale import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.{SparkConf, SparkException, TestUtils} import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.SQLHelper @@ -258,6 +258,9 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper newLine.startsWith("--") && !newLine.startsWith("--QUERY-DELIMITER") } + // SPARK-32106 Since we add SQL test 'transform.sql' will use `cat` command, + // here we need to check command available + assume(TestUtils.testCommandAvailable("/bin/bash")) val input = fileToString(new File(testCase.inputFile)) val (comments, code) = splitCommentsAndCodes(input) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala new file mode 100644 index 0000000000000..6b20f4cf88645 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkScriptTransformationSuite.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.{SparkException, TestUtils} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.test.SharedSparkSession + +class SparkScriptTransformationSuite extends BaseScriptTransformationSuite with SharedSparkSession { + import testImplicits._ + + override def isHive23OrSpark: Boolean = true + + override def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec = { + SparkScriptTransformationExec( + input = input, + script = script, + output = output, + child = child, + ioschema = ioschema + ) + } + + test("SPARK-32106: TRANSFORM with serde without hive should throw exception") { + assume(TestUtils.testCommandAvailable("/bin/bash")) + withTempView("v") { + val df = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") + df.createTempView("v") + + val e = intercept[ParseException] { + sql( + """ + |SELECT TRANSFORM (a) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' + |USING 'cat' AS (a) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' + |FROM v + """.stripMargin) + }.getMessage + assert(e.contains("TRANSFORM with serde is only supported in hive mode")) + } + } + + test("SPARK-32106: TRANSFORM doesn't support ArrayType/MapType/StructType " + + "as output data type (no serde)") { + assume(TestUtils.testCommandAvailable("/bin/bash")) + // check for ArrayType + val e1 = intercept[SparkException] { + sql( + """ + |SELECT TRANSFORM(a) + |USING 'cat' AS (a array) + |FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c) + """.stripMargin).collect() + }.getMessage + assert(e1.contains("SparkScriptTransformation without serde does not support" + + " ArrayType as output data type")) + + // check for MapType + val e2 = intercept[SparkException] { + sql( + """ + |SELECT TRANSFORM(b) + |USING 'cat' AS (b map) + |FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c) + """.stripMargin).collect() + }.getMessage + assert(e2.contains("SparkScriptTransformation without serde does not support" + + " MapType as output data type")) + + // check for StructType + val e3 = intercept[SparkException] { + sql( + """ + |SELECT TRANSFORM(c) + |USING 'cat' AS (c struct) + |FROM VALUES (array(1, 1), map('1', 1), struct(1, 'a')) t(a, b, c) + """.stripMargin).collect() + }.getMessage + assert(e3.contains("SparkScriptTransformation without serde does not support" + + " StructType as output data type")) + } +}