Skip to content

Commit

Permalink
[SPARK-32106][SQL] Implement script transform in sql/core
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

 * Implement `SparkScriptTransformationExec` based on `BaseScriptTransformationExec`
 * Implement `SparkScriptTransformationWriterThread` based on `BaseScriptTransformationWriterThread` of writing data
 * Add rule `SparkScripts` to support convert script LogicalPlan to SparkPlan in Spark SQL (without hive mode)
 * Add `SparkScriptTransformationSuite` test spark spec case
 * add test in `SQLQueryTestSuite`

And we will close #29085 .

### Why are the changes needed?
Support user use Script Transform without Hive

### Does this PR introduce _any_ user-facing change?
User can use Script Transformation without hive in no serde mode.
Such as :
**default no serde **
```
SELECT TRANSFORM(a, b, c)
USING 'cat' AS (a int, b string, c long)
FROM testData
```
**no serde with spec ROW FORMAT DELIMITED**
```
SELECT TRANSFORM(a, b, c)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
COLLECTION ITEMS TERMINATED BY '\u0002'
MAP KEYS TERMINATED BY '\u0003'
LINES TERMINATED BY '\n'
NULL DEFINED AS 'null'
USING 'cat' AS (a, b, c)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
COLLECTION ITEMS TERMINATED BY '\u0004'
MAP KEYS TERMINATED BY '\u0005'
LINES TERMINATED BY '\n'
NULL DEFINED AS 'NULL'
FROM testData
```

### How was this patch tested?
Added UT

Closes #29414 from AngersZhuuuu/SPARK-32106-MINOR.

Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
  • Loading branch information
AngersZhuuuu authored and maropu committed Dec 22, 2020
1 parent f62e957 commit 7466031
Show file tree
Hide file tree
Showing 11 changed files with 982 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -743,8 +743,33 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
selectClause.hints.asScala.foldRight(withWindow)(withHints)
}

// Script Transform's input/output format.
type ScriptIOFormat =
(Seq[(String, String)], Option[String], Seq[(String, String)], Option[String])

protected def getRowFormatDelimited(ctx: RowFormatDelimitedContext): ScriptIOFormat = {
// TODO we should use the visitRowFormatDelimited function here. However HiveScriptIOSchema
// expects a seq of pairs in which the old parsers' token names are used as keys.
// Transforming the result of visitRowFormatDelimited would be quite a bit messier than
// retrieving the key value pairs ourselves.
val entries = entry("TOK_TABLEROWFORMATFIELD", ctx.fieldsTerminatedBy) ++
entry("TOK_TABLEROWFORMATCOLLITEMS", ctx.collectionItemsTerminatedBy) ++
entry("TOK_TABLEROWFORMATMAPKEYS", ctx.keysTerminatedBy) ++
entry("TOK_TABLEROWFORMATNULL", ctx.nullDefinedAs) ++
Option(ctx.linesSeparatedBy).toSeq.map { token =>
val value = string(token)
validate(
value == "\n",
s"LINES TERMINATED BY only supports newline '\\n' right now: $value",
ctx)
"TOK_TABLEROWFORMATLINES" -> value
}

(entries, None, Seq.empty, None)
}

/**
* Create a (Hive based) [[ScriptInputOutputSchema]].
* Create a [[ScriptInputOutputSchema]].
*/
protected def withScriptIOSchema(
ctx: ParserRuleContext,
Expand All @@ -753,7 +778,30 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
outRowFormat: RowFormatContext,
recordReader: Token,
schemaLess: Boolean): ScriptInputOutputSchema = {
throw new ParseException("Script Transform is not supported", ctx)

def format(fmt: RowFormatContext): ScriptIOFormat = fmt match {
case c: RowFormatDelimitedContext =>
getRowFormatDelimited(c)

case c: RowFormatSerdeContext =>
throw new ParseException("TRANSFORM with serde is only supported in hive mode", ctx)

// SPARK-32106: When there is no definition about format, we return empty result
// to use a built-in default Serde in SparkScriptTransformationExec.
case null =>
(Nil, None, Seq.empty, None)
}

val (inFormat, inSerdeClass, inSerdeProps, reader) = format(inRowFormat)

val (outFormat, outSerdeClass, outSerdeProps, writer) = format(outRowFormat)

ScriptInputOutputSchema(
inFormat, outFormat,
inSerdeClass, outSerdeClass,
inSerdeProps, outSerdeProps,
reader, writer,
schemaLess)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}

/**
* Parser test cases for rules defined in [[CatalystSqlParser]] / [[AstBuilder]].
Expand Down Expand Up @@ -1031,4 +1031,115 @@ class PlanParserSuite extends AnalysisTest {
assertEqual("select a, b from db.c;;;", table("db", "c").select('a, 'b))
assertEqual("select a, b from db.c; ;; ;", table("db", "c").select('a, 'b))
}

test("SPARK-32106: TRANSFORM plan") {
// verify schema less
assertEqual(
"""
|SELECT TRANSFORM(a, b, c)
|USING 'cat'
|FROM testData
""".stripMargin,
ScriptTransformation(
Seq('a, 'b, 'c),
"cat",
Seq(AttributeReference("key", StringType)(),
AttributeReference("value", StringType)()),
UnresolvedRelation(TableIdentifier("testData")),
ScriptInputOutputSchema(List.empty, List.empty, None, None,
List.empty, List.empty, None, None, true))
)

// verify without output schema
assertEqual(
"""
|SELECT TRANSFORM(a, b, c)
|USING 'cat' AS (a, b, c)
|FROM testData
""".stripMargin,
ScriptTransformation(
Seq('a, 'b, 'c),
"cat",
Seq(AttributeReference("a", StringType)(),
AttributeReference("b", StringType)(),
AttributeReference("c", StringType)()),
UnresolvedRelation(TableIdentifier("testData")),
ScriptInputOutputSchema(List.empty, List.empty, None, None,
List.empty, List.empty, None, None, false)))

// verify with output schema
assertEqual(
"""
|SELECT TRANSFORM(a, b, c)
|USING 'cat' AS (a int, b string, c long)
|FROM testData
""".stripMargin,
ScriptTransformation(
Seq('a, 'b, 'c),
"cat",
Seq(AttributeReference("a", IntegerType)(),
AttributeReference("b", StringType)(),
AttributeReference("c", LongType)()),
UnresolvedRelation(TableIdentifier("testData")),
ScriptInputOutputSchema(List.empty, List.empty, None, None,
List.empty, List.empty, None, None, false)))

// verify with ROW FORMAT DELIMETED
assertEqual(
"""
|SELECT TRANSFORM(a, b, c)
| ROW FORMAT DELIMITED
| FIELDS TERMINATED BY '\t'
| COLLECTION ITEMS TERMINATED BY '\u0002'
| MAP KEYS TERMINATED BY '\u0003'
| LINES TERMINATED BY '\n'
| NULL DEFINED AS 'null'
| USING 'cat' AS (a, b, c)
| ROW FORMAT DELIMITED
| FIELDS TERMINATED BY '\t'
| COLLECTION ITEMS TERMINATED BY '\u0004'
| MAP KEYS TERMINATED BY '\u0005'
| LINES TERMINATED BY '\n'
| NULL DEFINED AS 'NULL'
|FROM testData
""".stripMargin,
ScriptTransformation(
Seq('a, 'b, 'c),
"cat",
Seq(AttributeReference("a", StringType)(),
AttributeReference("b", StringType)(),
AttributeReference("c", StringType)()),
UnresolvedRelation(TableIdentifier("testData")),
ScriptInputOutputSchema(
Seq(("TOK_TABLEROWFORMATFIELD", "\t"),
("TOK_TABLEROWFORMATCOLLITEMS", "\u0002"),
("TOK_TABLEROWFORMATMAPKEYS", "\u0003"),
("TOK_TABLEROWFORMATNULL", "null"),
("TOK_TABLEROWFORMATLINES", "\n")),
Seq(("TOK_TABLEROWFORMATFIELD", "\t"),
("TOK_TABLEROWFORMATCOLLITEMS", "\u0004"),
("TOK_TABLEROWFORMATMAPKEYS", "\u0005"),
("TOK_TABLEROWFORMATNULL", "NULL"),
("TOK_TABLEROWFORMATLINES", "\n")), None, None,
List.empty, List.empty, None, None, false)))

// verify with ROW FORMAT SERDE
intercept(
"""
|SELECT TRANSFORM(a, b, c)
| ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
| WITH SERDEPROPERTIES(
| "separatorChar" = "\t",
| "quoteChar" = "'",
| "escapeChar" = "\\")
| USING 'cat' AS (a, b, c)
| ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
| WITH SERDEPROPERTIES(
| "separatorChar" = "\t",
| "quoteChar" = "'",
| "escapeChar" = "\\")
|FROM testData
""".stripMargin,
"TRANSFORM with serde is only supported in hive mode")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class SparkPlanner(val session: SparkSession, val experimentalMethods: Experimen
Window ::
JoinSelection ::
InMemoryScans ::
SparkScripts ::
BasicOperators :: Nil)

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import java.io._

import org.apache.hadoop.conf.Configuration

import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.util.CircularBuffer

/**
* Transforms the input by forking and running the specified script.
*
* @param input the set of expression that should be passed to the script.
* @param script the command that should be executed.
* @param output the attributes that are produced by the script.
* @param child logical plan whose output is transformed.
* @param ioschema the class set that defines how to handle input/output data.
*/
case class SparkScriptTransformationExec(
input: Seq[Expression],
script: String,
output: Seq[Attribute],
child: SparkPlan,
ioschema: ScriptTransformationIOSchema)
extends BaseScriptTransformationExec {

override def processIterator(
inputIterator: Iterator[InternalRow],
hadoopConf: Configuration): Iterator[InternalRow] = {

val (outputStream, proc, inputStream, stderrBuffer) = initProc

val outputProjection = new InterpretedProjection(inputExpressionsWithoutSerde, child.output)

// This new thread will consume the ScriptTransformation's input rows and write them to the
// external process. That process's output will be read by this current thread.
val writerThread = SparkScriptTransformationWriterThread(
inputIterator.map(outputProjection),
inputExpressionsWithoutSerde.map(_.dataType),
ioschema,
outputStream,
proc,
stderrBuffer,
TaskContext.get(),
hadoopConf
)

val outputIterator =
createOutputIteratorWithoutSerde(writerThread, inputStream, proc, stderrBuffer)

writerThread.start()

outputIterator
}
}

case class SparkScriptTransformationWriterThread(
iter: Iterator[InternalRow],
inputSchema: Seq[DataType],
ioSchema: ScriptTransformationIOSchema,
outputStream: OutputStream,
proc: Process,
stderrBuffer: CircularBuffer,
taskContext: TaskContext,
conf: Configuration)
extends BaseScriptTransformationWriterThread {

override def processRows(): Unit = {
processRowsWithoutSerde()
}
}
Loading

0 comments on commit 7466031

Please sign in to comment.