Skip to content

Commit

Permalink
NETFLIX-BUILD: Update CREATE/REPLACE conversions with serde.
Browse files Browse the repository at this point in the history
  • Loading branch information
rdblue committed Mar 25, 2020
1 parent 51593cb commit 0522c93
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public interface TableCatalog extends CatalogPlugin {
*/
String PROP_LOCATION = "location";

/**
* A reserved property to specify a table was created with EXTERNAL.
*/
String PROP_EXTERNAL = "external";

/**
* A reserved property to specify the description of the table.
*/
Expand All @@ -61,6 +66,11 @@ public interface TableCatalog extends CatalogPlugin {
*/
String PROP_OWNER = "owner";

/**
* A prefix used to pass OPTIONS in table properties
*/
String OPTION_PREFIX = "option.";

/**
* List the tables in a namespace from the catalog.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,53 +144,53 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
throw new AnalysisException("Describing columns is not supported for v2 tables.")

case c @ CreateTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
assertNoCharTypeInSchema(c.tableSchema)
CreateV2Table(
catalog.asTableCatalog,
tbl.asIdentifier,
c.tableSchema,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider),
convertTableProperties(c),
ignoreIfExists = c.ifNotExists)

case c @ CreateTableAsSelectStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
CreateTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
c.asSelect,
convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider),
convertTableProperties(c),
writeOptions = c.options,
ignoreIfExists = c.ifNotExists)

case RefreshTableStatement(NonSessionCatalogAndTable(catalog, tbl)) =>
RefreshTable(catalog.asTableCatalog, tbl.asIdentifier)

case c @ ReplaceTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
assertNoCharTypeInSchema(c.tableSchema)
ReplaceTable(
catalog.asTableCatalog,
tbl.asIdentifier,
c.tableSchema,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider),
convertTableProperties(c),
orCreate = c.orCreate)

case c @ ReplaceTableAsSelectStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
ReplaceTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,
// convert the bucket spec and add it as a transform
c.partitioning ++ c.bucketSpec.map(_.asTransform),
c.asSelect,
convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider),
convertTableProperties(c),
writeOptions = c.options,
orCreate = c.orCreate)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{NamedRelation, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, UnresolvedV2Relation}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.AlterTable
import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, CreateTableAsSelectStatement, CreateTableStatement, ReplaceTableAsSelectStatement, ReplaceTableStatement, SerdeInfo}
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.{ArrayType, DataType, HIVE_TYPE_STRING, HiveStringType, MapType, StructField, StructType}
Expand Down Expand Up @@ -295,18 +295,58 @@ private[sql] object CatalogV2Util {
catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME)
}

def convertTableProperties(
def convertTableProperties(c: CreateTableStatement): Map[String, String] = {
convertTableProperties(
c.properties, c.options, c.serde, c.location, c.comment, c.provider, c.external)
}

def convertTableProperties(c: CreateTableAsSelectStatement): Map[String, String] = {
convertTableProperties(
c.properties, c.options, c.serde, c.location, c.comment, c.provider, c.external)
}

def convertTableProperties(r: ReplaceTableStatement): Map[String, String] = {
convertTableProperties(r.properties, r.options, r.serde, r.location, r.comment, r.provider)
}

def convertTableProperties(r: ReplaceTableAsSelectStatement): Map[String, String] = {
convertTableProperties(r.properties, r.options, r.serde, r.location, r.comment, r.provider)
}

private def convertTableProperties(
properties: Map[String, String],
options: Map[String, String],
serdeInfo: Option[SerdeInfo],
location: Option[String],
comment: Option[String],
provider: Option[String]): Map[String, String] = {
properties ++ options ++
provider: Option[String],
external: Boolean = false): Map[String, String] = {
properties ++
options.map { case (key, value) => TableCatalog.OPTION_PREFIX + key -> value } ++
convertToProperties(serdeInfo) ++
(if (external) Map(TableCatalog.PROP_EXTERNAL -> "true") else Map.empty) ++
provider.map(TableCatalog.PROP_PROVIDER -> _) ++
comment.map(TableCatalog.PROP_COMMENT -> _) ++
location.map(TableCatalog.PROP_LOCATION -> _)
}

private def convertToProperties(serdeInfo: Option[SerdeInfo]): Map[String, String] = {
serdeInfo match {
case Some(s) =>
(s.formatClasses match {
case Some((inputFormat, outputFormat)) =>
Map("hive.input-format" -> inputFormat, "hive.output-format" -> outputFormat)
case _ =>
Map.empty
}) ++
s.storedAs.map("hive.stored-as" -> _) ++
s.serde.map("hive.serde" -> _) ++
s.serdeProperties.map { case (key, value) => TableCatalog.OPTION_PREFIX + key -> value }
case None =>
Map.empty
}
}

def withDefaultOwnership(properties: Map[String, String]): Map[String, String] = {
properties ++ Map(TableCatalog.PROP_OWNER -> Utils.getCurrentUserName())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class DDLParserSuite extends AnalysisTest {
Some("parquet"),
Map.empty[String, String],
None,
None,
None)

Seq(createSql, replaceSql).foreach { sql =>
Expand All @@ -87,6 +88,7 @@ class DDLParserSuite extends AnalysisTest {
Some("parquet"),
Map.empty[String, String],
None,
None,
None),
expectedIfNotExists = true)
}
Expand All @@ -107,6 +109,7 @@ class DDLParserSuite extends AnalysisTest {
Some("parquet"),
Map.empty[String, String],
None,
None,
None)
Seq(createSql, replaceSql).foreach { sql =>
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
Expand Down Expand Up @@ -161,6 +164,7 @@ class DDLParserSuite extends AnalysisTest {
Some("parquet"),
Map.empty[String, String],
None,
None,
None)
Seq(createSql, replaceSql).foreach { sql =>
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
Expand All @@ -183,6 +187,7 @@ class DDLParserSuite extends AnalysisTest {
Some("parquet"),
Map.empty[String, String],
None,
None,
None)
Seq(createSql, replaceSql).foreach { sql =>
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
Expand All @@ -201,7 +206,8 @@ class DDLParserSuite extends AnalysisTest {
Some("parquet"),
Map.empty[String, String],
None,
Some("abc"))
Some("abc"),
None)
Seq(createSql, replaceSql).foreach{ sql =>
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
}
Expand All @@ -221,6 +227,7 @@ class DDLParserSuite extends AnalysisTest {
Some("parquet"),
Map.empty[String, String],
None,
None,
None)
Seq(createSql, replaceSql).foreach { sql =>
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
Expand All @@ -239,6 +246,7 @@ class DDLParserSuite extends AnalysisTest {
Some("parquet"),
Map.empty[String, String],
Some("/tmp/file"),
None,
None)
Seq(createSql, replaceSql).foreach { sql =>
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
Expand All @@ -257,6 +265,7 @@ class DDLParserSuite extends AnalysisTest {
Some("parquet"),
Map.empty[String, String],
None,
None,
None)
Seq(createSql, replaceSql).foreach { sql =>
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
Expand Down Expand Up @@ -318,6 +327,7 @@ class DDLParserSuite extends AnalysisTest {
Some("json"),
Map("a" -> "1", "b" -> "0.1", "c" -> "true"),
None,
None,
None),
expectedIfNotExists = false)
}
Expand Down Expand Up @@ -373,7 +383,8 @@ class DDLParserSuite extends AnalysisTest {
Some("parquet"),
Map.empty[String, String],
Some("/user/external/page_view"),
Some("This is the staging page view table"))
Some("This is the staging page view table"),
None)
Seq(s1, s2, s3, s4).foreach { sql =>
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = true)
}
Expand Down Expand Up @@ -2089,7 +2100,9 @@ class DDLParserSuite extends AnalysisTest {
provider: Option[String],
options: Map[String, String],
location: Option[String],
comment: Option[String])
comment: Option[String],
serdeInfo: Option[SerdeInfo],
extneral: Boolean = false)

private object TableSpec {
def apply(plan: LogicalPlan): TableSpec = {
Expand All @@ -2104,7 +2117,9 @@ class DDLParserSuite extends AnalysisTest {
create.provider,
create.options,
create.location,
create.comment)
create.comment,
create.serde,
create.external)
case replace: ReplaceTableStatement =>
TableSpec(
replace.tableName,
Expand All @@ -2115,7 +2130,8 @@ class DDLParserSuite extends AnalysisTest {
replace.provider,
replace.options,
replace.location,
replace.comment)
replace.comment,
replace.serde)
case ctas: CreateTableAsSelectStatement =>
TableSpec(
ctas.tableName,
Expand All @@ -2126,7 +2142,9 @@ class DDLParserSuite extends AnalysisTest {
ctas.provider,
ctas.options,
ctas.location,
ctas.comment)
ctas.comment,
ctas.serde,
ctas.external)
case rtas: ReplaceTableAsSelectStatement =>
TableSpec(
rtas.tableName,
Expand All @@ -2137,7 +2155,8 @@ class DDLParserSuite extends AnalysisTest {
rtas.provider,
rtas.options,
rtas.location,
rtas.comment)
rtas.comment,
rtas.serde)
case other =>
fail(s"Expected to parse Create, CTAS, Replace, or RTAS plan" +
s" from query, got ${other.getClass.getName}.")
Expand All @@ -2164,20 +2183,19 @@ class DDLParserSuite extends AnalysisTest {
}

test("create table - without using") {
withSQLConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED.key -> "false") {
val sql = "CREATE TABLE 1m.2g(a INT)"
val expectedTableSpec = TableSpec(
Seq("1m", "2g"),
Some(new StructType().add("a", IntegerType)),
Seq.empty[Transform],
None,
Map.empty[String, String],
None,
Map.empty[String, String],
None,
None)
val sql = "CREATE TABLE 1m.2g(a INT)"
val expectedTableSpec = TableSpec(
Seq("1m", "2g"),
Some(new StructType().add("a", IntegerType)),
Seq.empty[Transform],
None,
Map.empty[String, String],
None,
Map.empty[String, String],
None,
None,
None)

testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
}
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
}
}
Loading

0 comments on commit 0522c93

Please sign in to comment.