Skip to content

[SPARK-4131] [SQL] Support INSERT OVERWRITE [LOCAL] DIRECTORY '/path/to/dir' [ROW FORMAT row_format] [STORED AS file_format] query. #13067

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ query
insertInto
: INSERT OVERWRITE TABLE tableIdentifier partitionSpec? (IF NOT EXISTS)?
| INSERT INTO TABLE? tableIdentifier partitionSpec?
| INSERT OVERWRITE LOCAL? DIRECTORY path=STRING rowFormat? (STORED AS format=IDENTIFIER)?
;

partitionSpecLocation
Expand Down Expand Up @@ -649,7 +650,7 @@ nonReserved
| ASC | DESC | LIMIT | RENAME | SETS
| AT | NULLS | OVERWRITE | ALL | ALTER | AS | BETWEEN | BY | CREATE | DELETE
| DESCRIBE | DROP | EXISTS | FALSE | FOR | GROUP | IN | INSERT | INTO | IS |LIKE
| NULL | ORDER | OUTER | TABLE | TRUE | WITH | RLIKE
| NULL | ORDER | OUTER | TABLE | TRUE | WITH | RLIKE | DIRECTORY
;

SELECT: 'SELECT';
Expand Down Expand Up @@ -717,6 +718,7 @@ WITH: 'WITH';
VALUES: 'VALUES';
CREATE: 'CREATE';
TABLE: 'TABLE';
DIRECTORY: 'DIRECTORY';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this keyword to the nonReserved rule.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

VIEW: 'VIEW';
REPLACE: 'REPLACE';
INSERT: 'INSERT';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ object UnsupportedOperationChecker {
case _: InsertIntoTable =>
throwError("InsertIntoTable is not supported with streaming DataFrames/Datasets")

case _: InsertIntoDir =>
throwError("InsertIntoDir is not supported with streaming DataFrames/Datasets")

case Aggregate(_, _, child) if child.isStreaming =>
if (outputMode == Append) {
throwError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -193,20 +194,121 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
}

/**
* Add an INSERT INTO [TABLE]/INSERT OVERWRITE TABLE operation to the logical plan.
* A table property key can either be String or a collection of dot separated elements. This
* function extracts the property key based on whether its a string literal or a table property
* identifier.
*/
override def visitTablePropertyKey(key: TablePropertyKeyContext): String = {
if (key.STRING != null) {
string(key.STRING)
} else {
key.getText
}
}

/**
* Convert a table property list into a key-value map.
* This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]].
*/
override def visitTablePropertyList(
ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) {
val properties = ctx.tableProperty.asScala.map { property =>
val key = visitTablePropertyKey(property.key)
val value = Option(property.value).map(string).orNull
key -> value
}
// Check for duplicate property names.
checkDuplicateKeys(properties, ctx)
properties.toMap
}

/** Empty storage format for default values and copies. */
protected val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, false, Map.empty)

/**
* Create a [[CatalogStorageFormat]] used for creating tables.
*
* Example format:
* {{{
* SERDE serde_name [WITH SERDEPROPERTIES (k1=v1, k2=v2, ...)]
* }}}
*
* OR
*
* {{{
* DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]]
* [COLLECTION ITEMS TERMINATED BY char]
* [MAP KEYS TERMINATED BY char]
* [LINES TERMINATED BY char]
* [NULL DEFINED AS char]
* }}}
*/
protected def visitRowFormat(ctx: RowFormatContext): CatalogStorageFormat = withOrigin(ctx) {
ctx match {
case serde: RowFormatSerdeContext => visitRowFormatSerde(serde)
case delimited: RowFormatDelimitedContext => visitRowFormatDelimited(delimited)
}
}

/**
* Create SERDE row format name and properties pair.
*/
override def visitRowFormatSerde(
ctx: RowFormatSerdeContext): CatalogStorageFormat = withOrigin(ctx) {
import ctx._
EmptyStorageFormat.copy(
serde = Option(string(name)),
serdeProperties = Option(tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty))
}

/**
* Create a delimited row format properties object.
*/
override def visitRowFormatDelimited(
ctx: RowFormatDelimitedContext): CatalogStorageFormat = withOrigin(ctx) {
// Collect the entries if any.
def entry(key: String, value: Token): Seq[(String, String)] = {
Option(value).toSeq.map(x => key -> string(x))
}
// TODO we need proper support for the NULL format.
val entries =
entry("field.delim", ctx.fieldsTerminatedBy) ++
entry("serialization.format", ctx.fieldsTerminatedBy) ++
entry("escape.delim", ctx.escapedBy) ++
// The following typo is inherited from Hive...
entry("colelction.delim", ctx.collectionItemsTerminatedBy) ++
entry("mapkey.delim", ctx.keysTerminatedBy) ++
Option(ctx.linesSeparatedBy).toSeq.map { token =>
val value = string(token)
assert(
value == "\n",
s"LINES TERMINATED BY only supports newline '\\n' right now: $value",
ctx)
"line.delim" -> value
}
EmptyStorageFormat.copy(serdeProperties = entries.toMap)
}

/**
* Add an INSERT INTO [TABLE] / INSERT OVERWRITE TABLE / INSERT OVERWRITE DIRECTORY
* operation to the logical plan.
*/
private def withInsertInto(
ctx: InsertIntoContext,
query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
val tableIdent = visitTableIdentifier(ctx.tableIdentifier)
val tableIdent = Option(ctx.tableIdentifier).map(visitTableIdentifier).getOrElse(None)
val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty)

InsertIntoTable(
UnresolvedRelation(tableIdent, None),
partitionKeys,
query,
ctx.OVERWRITE != null,
ctx.EXISTS != null)
var storageFormat = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat)
storageFormat = storageFormat.copy(serde = Option(ctx.format).map(format => format.getText))

tableIdent match {
case Some(ti: TableIdentifier) => InsertIntoTable(UnresolvedRelation(ti, None),
partitionKeys,
query,
ctx.OVERWRITE != null, ctx.EXISTS != null)
case _ => InsertIntoDir(string(ctx.path), ctx.LOCAL != null, storageFormat, query)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -386,6 +387,16 @@ case class InsertIntoTable(
}
}

case class InsertIntoDir(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are moving quite a bit of parser code to Catalyst, it also adds alot of Hive parlance to Catalyst. Maybe we should just move this into SQL/core?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not figure out a way to push the code down without adding some extra if else statements which treats insert statements specially. If you insist I can add the code but I think its better the way it is right now.

path: String,
isLocal: Boolean,
rowFormat: CatalogStorageFormat,
child: LogicalPlan)
extends LogicalPlan {
override def children: Seq[LogicalPlan] = child :: Nil
override def output: Seq[Attribute] = Seq.empty
}

/**
* A container for holding named common table expressions (CTEs) and a query plan.
* This operator will be removed during analysis and the relations will be substituted into child.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,22 +381,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
)
}

/**
* Convert a table property list into a key-value map.
* This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]].
*/
override def visitTablePropertyList(
ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) {
val properties = ctx.tableProperty.asScala.map { property =>
val key = visitTablePropertyKey(property.key)
val value = Option(property.value).map(string).orNull
key -> value
}
// Check for duplicate property names.
checkDuplicateKeys(properties, ctx)
properties.toMap
}

/**
* Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified.
*/
Expand All @@ -423,22 +407,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
props.keys.toSeq
}

/**
* A table property key can either be String or a collection of dot separated elements. This
* function extracts the property key based on whether its a string literal or a table property
* identifier.
*/
override def visitTablePropertyKey(key: TablePropertyKeyContext): String = {
if (key.STRING != null) {
string(key.STRING)
} else {
key.getText
}
}

/**
* Create a [[CreateDatabaseCommand]] command.
*
* For example:
* {{{
* CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment]
Expand Down Expand Up @@ -950,9 +920,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
}

/** Empty storage format for default values and copies. */
private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, false, Map.empty)

/**
* Create a [[CatalogStorageFormat]].
*/
Expand Down Expand Up @@ -980,70 +947,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
}

/**
* Create a [[CatalogStorageFormat]] used for creating tables.
*
* Example format:
* {{{
* SERDE serde_name [WITH SERDEPROPERTIES (k1=v1, k2=v2, ...)]
* }}}
*
* OR
*
* {{{
* DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]]
* [COLLECTION ITEMS TERMINATED BY char]
* [MAP KEYS TERMINATED BY char]
* [LINES TERMINATED BY char]
* [NULL DEFINED AS char]
* }}}
*/
private def visitRowFormat(ctx: RowFormatContext): CatalogStorageFormat = withOrigin(ctx) {
ctx match {
case serde: RowFormatSerdeContext => visitRowFormatSerde(serde)
case delimited: RowFormatDelimitedContext => visitRowFormatDelimited(delimited)
}
}

/**
* Create SERDE row format name and properties pair.
*/
override def visitRowFormatSerde(
ctx: RowFormatSerdeContext): CatalogStorageFormat = withOrigin(ctx) {
import ctx._
EmptyStorageFormat.copy(
serde = Option(string(name)),
serdeProperties = Option(tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
}

/**
* Create a delimited row format properties object.
*/
override def visitRowFormatDelimited(
ctx: RowFormatDelimitedContext): CatalogStorageFormat = withOrigin(ctx) {
// Collect the entries if any.
def entry(key: String, value: Token): Seq[(String, String)] = {
Option(value).toSeq.map(x => key -> string(x))
}
// TODO we need proper support for the NULL format.
val entries =
entry("field.delim", ctx.fieldsTerminatedBy) ++
entry("serialization.format", ctx.fieldsTerminatedBy) ++
entry("escape.delim", ctx.escapedBy) ++
// The following typo is inherited from Hive...
entry("colelction.delim", ctx.collectionItemsTerminatedBy) ++
entry("mapkey.delim", ctx.keysTerminatedBy) ++
Option(ctx.linesSeparatedBy).toSeq.map { token =>
val value = string(token)
assert(
value == "\n",
s"LINES TERMINATED BY only supports newline '\\n' right now: $value",
ctx)
"line.delim" -> value
}
EmptyStorageFormat.copy(serdeProperties = entries.toMap)
}

/**
* Throw a [[ParseException]] if the user specified incompatible SerDes through ROW FORMAT
* and STORED AS.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ private[hive] trait HiveStrategies {
table: MetastoreRelation, partition, child, overwrite, ifNotExists) =>
execution.InsertIntoHiveTable(
table, partition, planLater(child), overwrite, ifNotExists) :: Nil
case logical.InsertIntoDir(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a Hive-only feature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is HQL syntax but no reason for it to be HiveOnly feature.

path, isLocal, rowFormat, child) =>
execution.InsertIntoDir(
path, isLocal, rowFormat, planLater(child)) :: Nil
case _ => Nil
}
}
Expand Down
Loading