Skip to content

Commit a8575a0

Browse files
Add SerDe support for CTAS
1 parent f5de7de commit a8575a0

File tree

6 files changed

+393
-79
lines changed

6 files changed

+393
-79
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 30 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -423,58 +423,55 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
423423
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
424424
// Wait until children are resolved.
425425
case p: LogicalPlan if !p.childrenResolved => p
426-
427-
case CreateTableAsSelect(desc, child, allowExisting) =>
428-
if (hive.convertCTAS && !desc.serde.isDefined) {
429-
// Do the conversion when spark.sql.hive.convertCTAS is true and the query
430-
// does not specify any storage format (file format and storage handler).
431-
if (desc.specifiedDatabase.isDefined) {
432-
throw new AnalysisException(
433-
"Cannot specify database name in a CTAS statement " +
434-
"when spark.sql.hive.convertCTAS is set to true.")
435-
}
436-
437-
val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
438-
CreateTableUsingAsSelect(
439-
desc.name,
440-
hive.conf.defaultDataSourceName,
441-
temporary = false,
442-
mode,
443-
options = Map.empty[String, String],
444-
child
445-
)
426+
case p: LogicalPlan if p.resolved => p
427+
case p @ CreateTableAsSelect(table, child, allowExisting) =>
428+
val schema = if (table.schema.size > 0) {
429+
table.schema
446430
} else {
447-
execution.CreateTableAsSelect(
448-
desc.copy(
449-
specifiedDatabase = Option(desc.specifiedDatabase.getOrElse(client.currentDatabase))),
450-
child,
451-
allowExisting)
431+
child.output.map {
432+
attr => new HiveColumn(attr.name, toMetastoreType(attr.dataType), null)
433+
}
452434
}
453435

454-
case p: LogicalPlan if p.resolved => p
436+
val desc = table.copy(schema = schema)
455437

456-
case p @ CreateTableAsSelect(desc, child, allowExisting) =>
457-
val (dbName, tblName) = processDatabaseAndTableName(desc.database, desc.name)
438+
// This is a hack, we only take the RC, ORC and Parquet as specific storage
439+
// otherwise, we will convert it into Parquet2 when hive.convertCTAS specified
440+
val specificStorage = (table.inputFormat.map(format => {
441+
// org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat => Parquet
442+
// org.apache.hadoop.hive.ql.io.orc.OrcInputFormat => Orc
443+
// org.apache.hadoop.hive.ql.io.RCFileInputFormat => RCFile
444+
// parquet.hive.DeprecatedParquetInputFormat => Parquet
445+
// TODO configurable?
446+
format.contains("Orc") || format.contains("Parquet") || format.contains("RCFile")
447+
}).getOrElse(false))
458448

459-
if (hive.convertCTAS) {
460-
if (desc.specifiedDatabase.isDefined) {
449+
if (hive.convertCTAS && !specificStorage) {
450+
// Do the conversion when spark.sql.hive.convertCTAS is true and the query
451+
// does not specify any storage format (file format and storage handler).
452+
if (table.specifiedDatabase.isDefined) {
461453
throw new AnalysisException(
462454
"Cannot specify database name in a CTAS statement " +
463-
"when spark.sql.hive.convertCTAS is set to true.")
455+
"when spark.sql.hive.convertCTAS is set to true.")
464456
}
465457

466458
val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
467459
CreateTableUsingAsSelect(
468-
tblName,
460+
desc.name,
469461
hive.conf.defaultDataSourceName,
470462
temporary = false,
471463
mode,
472464
options = Map.empty[String, String],
473465
child
474466
)
475467
} else {
468+
val (dbName, tblName) =
469+
processDatabaseAndTableName(
470+
table.specifiedDatabase.getOrElse(client.currentDatabase), table.name)
476471
execution.CreateTableAsSelect(
477-
desc,
472+
desc.copy(
473+
specifiedDatabase = Some(dbName),
474+
name = tblName),
478475
child,
479476
allowExisting)
480477
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 177 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,15 @@ import java.sql.Date
2222
import scala.collection.mutable.ArrayBuffer
2323

2424
import org.apache.hadoop.hive.conf.HiveConf
25-
import org.apache.hadoop.hive.ql.Context
25+
import org.apache.hadoop.hive.serde.serdeConstants
26+
import org.apache.hadoop.hive.ql.{ErrorMsg, Context}
2627
import org.apache.hadoop.hive.ql.exec.{FunctionRegistry, FunctionInfo}
2728
import org.apache.hadoop.hive.ql.lib.Node
28-
import org.apache.hadoop.hive.ql.metadata.Table
2929
import org.apache.hadoop.hive.ql.parse._
30-
import org.apache.hadoop.hive.ql.plan.PlanUtils
31-
import org.apache.spark.sql.{AnalysisException, SparkSQLParser}
30+
import org.apache.hadoop.hive.ql.plan._
31+
import org.apache.hadoop.hive.ql.session.SessionState
3232

33+
import org.apache.spark.sql.{AnalysisException, SparkSQLParser}
3334
import org.apache.spark.sql.catalyst.analysis._
3435
import org.apache.spark.sql.catalyst.expressions._
3536
import org.apache.spark.sql.catalyst.plans._
@@ -62,7 +63,8 @@ case class CreateTableAsSelect(
6263
allowExisting: Boolean) extends UnaryNode with Command {
6364

6465
override def output: Seq[Attribute] = Seq.empty[Attribute]
65-
override lazy val resolved: Boolean = tableDesc.specifiedDatabase.isDefined && childrenResolved
66+
override lazy val resolved: Boolean =
67+
tableDesc.specifiedDatabase.isDefined && tableDesc.schema.size > 0 && childrenResolved
6668
}
6769

6870
/** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */
@@ -243,12 +245,24 @@ private[hive] object HiveQl {
243245
* Otherwise, there will be Null pointer exception,
244246
* when retrieving properties form HiveConf.
245247
*/
246-
val hContext = new Context(new HiveConf())
248+
val hContext = new Context(hiveConf)
247249
val node = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, hContext))
248250
hContext.clear()
249251
node
250252
}
251253

254+
/**
255+
* Returns the HiveConf
256+
* TODO get it from HiveContext?
257+
*/
258+
private[this] def hiveConf(): HiveConf = {
259+
val ss = SessionState.get() // SessionState is lazy initializaion, it can be null here
260+
if (ss == null) {
261+
new HiveConf()
262+
} else {
263+
ss.getConf
264+
}
265+
}
252266

253267
/** Returns a LogicalPlan for a given HiveQL string. */
254268
def parseSql(sql: String): LogicalPlan = hqlParser.parse(sql)
@@ -479,8 +493,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
479493
DropTable(tableName, ifExists.nonEmpty)
480494
// Support "ANALYZE TABLE tableNmae COMPUTE STATISTICS noscan"
481495
case Token("TOK_ANALYZE",
482-
Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) ::
483-
isNoscan) =>
496+
Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) ::
497+
isNoscan) =>
484498
// Reference:
485499
// https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables
486500
if (partitionSpec.nonEmpty) {
@@ -550,13 +564,15 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
550564
val (
551565
Some(tableNameParts) ::
552566
_ /* likeTable */ ::
567+
externalTable ::
553568
Some(query) ::
554569
allowExisting +:
555570
ignores) =
556571
getClauses(
557572
Seq(
558573
"TOK_TABNAME",
559574
"TOK_LIKETABLE",
575+
"EXTERNAL",
560576
"TOK_QUERY",
561577
"TOK_IFNOTEXISTS",
562578
"TOK_TABLECOMMENT",
@@ -579,43 +595,156 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
579595
children)
580596
val (db, tableName) = extractDbNameTableName(tableNameParts)
581597

582-
var tableDesc =
583-
HiveTable(
584-
specifiedDatabase = db,
585-
name = tableName,
586-
schema = Seq.empty,
587-
partitionColumns = Seq.empty,
588-
properties = Map.empty,
589-
serdeProperties = Map.empty,
590-
tableType = ManagedTable,
591-
location = None,
592-
inputFormat = None,
593-
outputFormat = None,
594-
serde = None)
595-
596-
// TODO: Handle all the cases here...
597-
children.foreach {
598-
case Token("TOK_TBLRCFILE", Nil) =>
599-
import org.apache.hadoop.hive.ql.io.{RCFileInputFormat, RCFileOutputFormat}
598+
// TODO add bucket support
599+
var tableDesc: HiveTable = HiveTable(
600+
specifiedDatabase = db,
601+
name = tableName,
602+
schema = Seq.empty[HiveColumn],
603+
partitionColumns = Seq.empty[HiveColumn],
604+
properties = Map[String, String](),
605+
serdeProperties = Map[String, String](),
606+
tableType = if (externalTable.isDefined) ExternalTable else ManagedTable,
607+
location = None,
608+
inputFormat = None,
609+
outputFormat = None,
610+
serde = None,
611+
viewText = None)
612+
613+
// default serde & input/output format
614+
tableDesc = if ("SequenceFile".equalsIgnoreCase(
615+
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
616+
tableDesc.copy(
617+
inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"),
618+
outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat"),
619+
serde = Option("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
620+
} else if ("RCFile".equalsIgnoreCase(
621+
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
622+
tableDesc.copy(
623+
inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
624+
outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"),
625+
serde = Option(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE)))
626+
} else if ("ORC".equalsIgnoreCase(
627+
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
628+
tableDesc.copy(
629+
inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
630+
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
631+
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
632+
} else if ("PARQUET".equalsIgnoreCase(
633+
hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) {
634+
tableDesc.copy(
635+
inputFormat =
636+
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
637+
outputFormat =
638+
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
639+
serde =
640+
Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
641+
} else {
642+
tableDesc.copy(
643+
inputFormat =
644+
Option("org.apache.hadoop.mapred.TextInputFormat"),
645+
outputFormat =
646+
Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat"))
647+
}
648+
649+
children.collect {
650+
case list @ Token("TOK_TABCOLLIST", _) =>
651+
val cols = BaseSemanticAnalyzer.getColumns(list, true)
652+
if (cols != null) {
653+
tableDesc = tableDesc.copy(
654+
schema = cols.map { field =>
655+
HiveColumn(field.getName, field.getType, field.getComment)
656+
})
657+
}
658+
case Token("TOK_TABLECOMMENT", child :: Nil) =>
659+
val comment = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
660+
// TODO support the sql text
661+
tableDesc = tableDesc.copy(viewText = Option(comment))
662+
case Token("TOK_TABLEPARTCOLS", list @ Token("TOK_TABCOLLIST", _) :: Nil) =>
663+
val cols = BaseSemanticAnalyzer.getColumns(list(0), false)
664+
if (cols != null) {
665+
tableDesc = tableDesc.copy(
666+
partitionColumns = cols.map { field =>
667+
HiveColumn(field.getName, field.getType, field.getComment)
668+
})
669+
}
670+
case Token("TOK_TABLEROWFORMAT", Token("TOK_SERDEPROPS", child :: Nil) :: Nil)=>
671+
val serdeParams = new java.util.HashMap[String, String]()
672+
child match {
673+
case Token("TOK_TABLEROWFORMATFIELD", rowChild1 :: rowChild2) =>
674+
val fieldDelim = BaseSemanticAnalyzer.unescapeSQLString (rowChild1.getText())
675+
serdeParams.put(serdeConstants.FIELD_DELIM, fieldDelim)
676+
serdeParams.put(serdeConstants.SERIALIZATION_FORMAT, fieldDelim)
677+
if (rowChild2.length > 1) {
678+
val fieldEscape = BaseSemanticAnalyzer.unescapeSQLString (rowChild2(0).getText)
679+
serdeParams.put(serdeConstants.ESCAPE_CHAR, fieldEscape)
680+
}
681+
case Token("TOK_TABLEROWFORMATCOLLITEMS", rowChild :: Nil) =>
682+
val collItemDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
683+
serdeParams.put(serdeConstants.COLLECTION_DELIM, collItemDelim)
684+
case Token("TOK_TABLEROWFORMATMAPKEYS", rowChild :: Nil) =>
685+
val mapKeyDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
686+
serdeParams.put(serdeConstants.MAPKEY_DELIM, mapKeyDelim)
687+
case Token("TOK_TABLEROWFORMATLINES", rowChild :: Nil) =>
688+
val lineDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
689+
if (!(lineDelim == "\n") && !(lineDelim == "10")) {
690+
throw new AnalysisException(
691+
SemanticAnalyzer.generateErrorMessage(
692+
rowChild,
693+
ErrorMsg.LINES_TERMINATED_BY_NON_NEWLINE.getMsg))
694+
}
695+
serdeParams.put(serdeConstants.LINE_DELIM, lineDelim)
696+
case Token("TOK_TABLEROWFORMATNULL", rowChild :: Nil) =>
697+
val nullFormat = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText)
698+
// TODO support the nullFormat
699+
case _ => assert(false)
700+
}
701+
tableDesc = tableDesc.copy(
702+
serdeProperties = tableDesc.serdeProperties ++ serdeParams)
703+
case Token("TOK_TABLELOCATION", child :: Nil) =>
704+
var location = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
705+
location = EximUtil.relativeToAbsolutePath(hiveConf, location)
706+
tableDesc = tableDesc.copy(location = Option(location))
707+
case Token("TOK_TABLESERIALIZER", child :: Nil) =>
600708
tableDesc = tableDesc.copy(
601-
outputFormat = Option(classOf[RCFileOutputFormat].getName),
602-
inputFormat = Option(classOf[RCFileInputFormat[_, _]].getName))
709+
serde = Option(BaseSemanticAnalyzer.unescapeSQLString(child.getChild(0).getText)))
710+
if (child.getChildCount == 2) {
711+
val serdeParams = new java.util.HashMap[String, String]()
712+
BaseSemanticAnalyzer.readProps(
713+
(child.getChild(1).getChild(0)).asInstanceOf[ASTNode], serdeParams)
714+
tableDesc = tableDesc.copy(serdeProperties = tableDesc.serdeProperties ++ serdeParams)
715+
}
716+
case Token("TOK_FILEFORMAT_GENERIC", child :: Nil) =>
717+
throw new SemanticException(
718+
"Unrecognized file format in STORED AS clause:${child.getText}")
603719

720+
case Token("TOK_TBLRCFILE", Nil) =>
721+
tableDesc = tableDesc.copy(
722+
inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"),
723+
outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
604724
if (tableDesc.serde.isEmpty) {
605725
tableDesc = tableDesc.copy(
606726
serde = Option("org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"))
607727
}
728+
608729
case Token("TOK_TBLORCFILE", Nil) =>
609730
tableDesc = tableDesc.copy(
610731
inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"),
611-
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"),
612-
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
732+
outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
733+
if (tableDesc.serde.isEmpty) {
734+
tableDesc = tableDesc.copy(
735+
serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
736+
}
613737

614738
case Token("TOK_TBLPARQUETFILE", Nil) =>
615739
tableDesc = tableDesc.copy(
616-
inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
617-
outputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"),
618-
serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
740+
inputFormat =
741+
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"),
742+
outputFormat =
743+
Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
744+
if (tableDesc.serde.isEmpty) {
745+
tableDesc = tableDesc.copy(
746+
serde = Option("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
747+
}
619748

620749
case Token("TOK_TABLESERIALIZER",
621750
Token("TOK_SERDENAME", Token(serdeName, Nil) :: otherProps) :: Nil) =>
@@ -630,13 +759,26 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
630759

631760
case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
632761
tableDesc = tableDesc.copy(properties = tableDesc.properties ++ getProperties(list))
762+
case list @ Token("TOK_TABLEFILEFORMAT", _) =>
763+
tableDesc = tableDesc.copy(
764+
inputFormat =
765+
Option(BaseSemanticAnalyzer.unescapeSQLString(list.getChild(0).getText)),
766+
outputFormat =
767+
Option(BaseSemanticAnalyzer.unescapeSQLString(list.getChild(1).getText)))
768+
case Token("TOK_STORAGEHANDLER", _) =>
769+
throw new AnalysisException(ErrorMsg.CREATE_NON_NATIVE_AS.getMsg())
770+
case _ => // Unsupport features
771+
}
633772

634-
case _ =>
773+
if (tableDesc.serde.isEmpty) {
774+
// add default serde
775+
tableDesc = tableDesc.copy(
776+
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
635777
}
636778

637779
CreateTableAsSelect(tableDesc, nodeToPlan(query), allowExisting != None)
638780

639-
// If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command.
781+
// If its not a "CTAS" like above then take it as a native command
640782
case Token("TOK_CREATETABLE", _) => NativePlaceholder
641783

642784
// Support "TRUNCATE TABLE table_name [PARTITION partition_spec]"

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,12 @@ private[hive] class ClientWrapper(
225225
table.partitionColumns.map(c => new FieldSchema(c.name, c.hiveType, c.comment)))
226226
table.properties.foreach { case (k, v) => qlTable.setProperty(k, v) }
227227
table.serdeProperties.foreach { case (k, v) => qlTable.setSerdeParam(k, v) }
228+
229+
// set owner
230+
qlTable.setOwner(conf.getUser)
231+
// set create time
232+
qlTable.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])
233+
228234
version match {
229235
case hive.v12 =>
230236
table.location.map(new URI(_)).foreach(u => qlTable.call[URI, Unit]("setDataLocation", u))

0 commit comments

Comments
 (0)