Skip to content

Commit b5297c4

Browse files
LantaoJindongjoon-hyun
authored andcommitted
[SPARK-20680][SQL] Spark-sql do not support for creating table with void column datatype
### What changes were proposed in this pull request? This is the new PR which to address the close one #17953 1. support "void" primitive data type in the `AstBuilder`, point it to `NullType` 2. forbid creating tables with VOID/NULL column type ### Why are the changes needed? 1. Spark is incompatible with hive void type. When Hive table schema contains void type, DESC table will throw an exception in Spark. >hive> create table bad as select 1 x, null z from dual; >hive> describe bad; OK x int z void In Spark2.0.x, the behaviour to read this view is normal: >spark-sql> describe bad; x int NULL z void NULL Time taken: 4.431 seconds, Fetched 2 row(s) But in lastest Spark version, it failed with SparkException: Cannot recognize hive type string: void >spark-sql> describe bad; 17/05/09 03:12:08 ERROR thriftserver.SparkSQLDriver: Failed in [describe bad] org.apache.spark.SparkException: Cannot recognize hive type string: void Caused by: org.apache.spark.sql.catalyst.parser.ParseException: DataType void() is not supported.(line 1, pos 0) == SQL == void ^^^ ... 61 more org.apache.spark.SparkException: Cannot recognize hive type string: void 2. Hive CTAS statements throws error when select clause has NULL/VOID type column since HIVE-11217 In Spark, creating table with a VOID/NULL column should throw readable exception message, include - create data source table (using parquet, json, ...) - create hive table (with or without stored as) - CTAS ### Does this PR introduce any user-facing change? No ### How was this patch tested? Add unit tests Closes #28833 from LantaoJin/SPARK-20680_COPY. Authored-by: LantaoJin <jinlantao@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 3659611 commit b5297c4

File tree

20 files changed

+191
-14
lines changed

20 files changed

+191
-14
lines changed

python/pyspark/sql/types.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,9 @@ class NullType(DataType):
116116

117117
__metaclass__ = DataTypeSingleton
118118

119+
def simpleString(self):
120+
return 'unknown'
121+
119122

120123
class AtomicType(DataType):
121124
"""An internal type used to represent everything that is not

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
3434
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
3535
case AlterTableAddColumnsStatement(
3636
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
37+
cols.foreach(c => failNullType(c.dataType))
3738
cols.foreach(c => failCharType(c.dataType))
3839
val changes = cols.map { col =>
3940
TableChange.addColumn(
@@ -47,6 +48,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
4748

4849
case AlterTableReplaceColumnsStatement(
4950
nameParts @ NonSessionCatalogAndTable(catalog, tbl), cols) =>
51+
cols.foreach(c => failNullType(c.dataType))
5052
cols.foreach(c => failCharType(c.dataType))
5153
val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match {
5254
case Some(table) =>
@@ -69,6 +71,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
6971

7072
case a @ AlterTableAlterColumnStatement(
7173
nameParts @ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
74+
a.dataType.foreach(failNullType)
7275
a.dataType.foreach(failCharType)
7376
val colName = a.column.toArray
7477
val typeChange = a.dataType.map { newDataType =>
@@ -145,6 +148,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
145148

146149
case c @ CreateTableStatement(
147150
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
151+
assertNoNullTypeInSchema(c.tableSchema)
148152
assertNoCharTypeInSchema(c.tableSchema)
149153
CreateV2Table(
150154
catalog.asTableCatalog,
@@ -157,6 +161,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
157161

158162
case c @ CreateTableAsSelectStatement(
159163
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
164+
if (c.asSelect.resolved) {
165+
assertNoNullTypeInSchema(c.asSelect.schema)
166+
}
160167
CreateTableAsSelect(
161168
catalog.asTableCatalog,
162169
tbl.asIdentifier,
@@ -172,6 +179,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
172179

173180
case c @ ReplaceTableStatement(
174181
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
182+
assertNoNullTypeInSchema(c.tableSchema)
175183
assertNoCharTypeInSchema(c.tableSchema)
176184
ReplaceTable(
177185
catalog.asTableCatalog,
@@ -184,6 +192,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
184192

185193
case c @ ReplaceTableAsSelectStatement(
186194
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
195+
if (c.asSelect.resolved) {
196+
assertNoNullTypeInSchema(c.asSelect.schema)
197+
}
187198
ReplaceTableAsSelect(
188199
catalog.asTableCatalog,
189200
tbl.asIdentifier,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2203,6 +2203,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
22032203
DecimalType(precision.getText.toInt, 0)
22042204
case ("decimal" | "dec" | "numeric", precision :: scale :: Nil) =>
22052205
DecimalType(precision.getText.toInt, scale.getText.toInt)
2206+
case ("void", Nil) => NullType
22062207
case ("interval", Nil) => CalendarIntervalType
22072208
case (dt, params) =>
22082209
val dtStr = if (params.nonEmpty) s"$dt(${params.mkString(",")})" else dt

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
2828
import org.apache.spark.sql.catalyst.plans.logical.AlterTable
2929
import org.apache.spark.sql.connector.catalog.TableChange._
3030
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
31-
import org.apache.spark.sql.types.{ArrayType, DataType, HIVE_TYPE_STRING, HiveStringType, MapType, StructField, StructType}
31+
import org.apache.spark.sql.types.{ArrayType, DataType, HIVE_TYPE_STRING, HiveStringType, MapType, NullType, StructField, StructType}
3232
import org.apache.spark.sql.util.CaseInsensitiveStringMap
3333
import org.apache.spark.util.Utils
3434

@@ -346,4 +346,23 @@ private[sql] object CatalogV2Util {
346346
}
347347
}
348348
}
349+
350+
def failNullType(dt: DataType): Unit = {
351+
def containsNullType(dt: DataType): Boolean = dt match {
352+
case ArrayType(et, _) => containsNullType(et)
353+
case MapType(kt, vt, _) => containsNullType(kt) || containsNullType(vt)
354+
case StructType(fields) => fields.exists(f => containsNullType(f.dataType))
355+
case _ => dt.isInstanceOf[NullType]
356+
}
357+
if (containsNullType(dt)) {
358+
throw new AnalysisException(
359+
"Cannot create tables with unknown type.")
360+
}
361+
}
362+
363+
def assertNoNullTypeInSchema(schema: StructType): Unit = {
364+
schema.foreach { f =>
365+
failNullType(f.dataType)
366+
}
367+
}
349368
}

sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ class NullType private() extends DataType {
3232
override def defaultSize: Int = 1
3333

3434
private[spark] override def asNullable: NullType = this
35+
36+
// "null" is mainly used to represent a literal in Spark,
37+
// it's better to avoid using it for data types.
38+
override def simpleString: String = "unknown"
3539
}
3640

3741
/**

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class DataTypeParserSuite extends SparkFunSuite {
6161
checkDataType("varchAr(20)", StringType)
6262
checkDataType("cHaR(27)", StringType)
6363
checkDataType("BINARY", BinaryType)
64+
checkDataType("void", NullType)
6465
checkDataType("interval", CalendarIntervalType)
6566

6667
checkDataType("array<doublE>", ArrayType(DoubleType, true))

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class ResolveSessionCatalog(
4848
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
4949
case AlterTableAddColumnsStatement(
5050
nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
51+
cols.foreach(c => failNullType(c.dataType))
5152
loadTable(catalog, tbl.asIdentifier).collect {
5253
case v1Table: V1Table =>
5354
if (!DDLUtils.isHiveTable(v1Table.v1Table)) {
@@ -76,6 +77,7 @@ class ResolveSessionCatalog(
7677

7778
case AlterTableReplaceColumnsStatement(
7879
nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
80+
cols.foreach(c => failNullType(c.dataType))
7981
val changes: Seq[TableChange] = loadTable(catalog, tbl.asIdentifier) match {
8082
case Some(_: V1Table) =>
8183
throw new AnalysisException("REPLACE COLUMNS is only supported with v2 tables.")
@@ -100,6 +102,7 @@ class ResolveSessionCatalog(
100102

101103
case a @ AlterTableAlterColumnStatement(
102104
nameParts @ SessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
105+
a.dataType.foreach(failNullType)
103106
loadTable(catalog, tbl.asIdentifier).collect {
104107
case v1Table: V1Table =>
105108
if (!DDLUtils.isHiveTable(v1Table.v1Table)) {
@@ -268,6 +271,7 @@ class ResolveSessionCatalog(
268271
// session catalog and the table provider is not v2.
269272
case c @ CreateTableStatement(
270273
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
274+
assertNoNullTypeInSchema(c.tableSchema)
271275
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
272276
if (!isV2Provider(provider)) {
273277
if (!DDLUtils.isHiveTable(Some(provider))) {
@@ -292,6 +296,9 @@ class ResolveSessionCatalog(
292296

293297
case c @ CreateTableAsSelectStatement(
294298
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
299+
if (c.asSelect.resolved) {
300+
assertNoNullTypeInSchema(c.asSelect.schema)
301+
}
295302
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
296303
if (!isV2Provider(provider)) {
297304
val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType,
@@ -319,6 +326,7 @@ class ResolveSessionCatalog(
319326
// session catalog and the table provider is not v2.
320327
case c @ ReplaceTableStatement(
321328
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
329+
assertNoNullTypeInSchema(c.tableSchema)
322330
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
323331
if (!isV2Provider(provider)) {
324332
throw new AnalysisException("REPLACE TABLE is only supported with v2 tables.")
@@ -336,6 +344,9 @@ class ResolveSessionCatalog(
336344

337345
case c @ ReplaceTableAsSelectStatement(
338346
SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
347+
if (c.asSelect.resolved) {
348+
assertNoNullTypeInSchema(c.asSelect.schema)
349+
}
339350
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
340351
if (!isV2Provider(provider)) {
341352
throw new AnalysisException("REPLACE TABLE AS SELECT is only supported with v2 tables.")

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.catalog._
2525
import org.apache.spark.sql.catalyst.expressions.{Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, RowOrdering}
2626
import org.apache.spark.sql.catalyst.plans.logical._
2727
import org.apache.spark.sql.catalyst.rules.Rule
28+
import org.apache.spark.sql.connector.catalog.CatalogV2Util.assertNoNullTypeInSchema
2829
import org.apache.spark.sql.connector.expressions.{FieldReference, RewritableTransform}
2930
import org.apache.spark.sql.execution.command.DDLUtils
3031
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
@@ -292,6 +293,8 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
292293
"in the table definition of " + table.identifier,
293294
sparkSession.sessionState.conf.caseSensitiveAnalysis)
294295

296+
assertNoNullTypeInSchema(schema)
297+
295298
val normalizedPartCols = normalizePartitionColumns(schema, table)
296299
val normalizedBucketSpec = normalizeBucketSpec(schema, table)
297300

sql/core/src/test/resources/sql-functions/sql-expression-schema.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
| org.apache.spark.sql.catalyst.expressions.Ascii | ascii | SELECT ascii('222') | struct<ascii(222):int> |
3535
| org.apache.spark.sql.catalyst.expressions.Asin | asin | SELECT asin(0) | struct<ASIN(CAST(0 AS DOUBLE)):double> |
3636
| org.apache.spark.sql.catalyst.expressions.Asinh | asinh | SELECT asinh(0) | struct<ASINH(CAST(0 AS DOUBLE)):double> |
37-
| org.apache.spark.sql.catalyst.expressions.AssertTrue | assert_true | SELECT assert_true(0 < 1) | struct<assert_true((0 < 1)):null> |
37+
| org.apache.spark.sql.catalyst.expressions.AssertTrue | assert_true | SELECT assert_true(0 < 1) | struct<assert_true((0 < 1)):unknown> |
3838
| org.apache.spark.sql.catalyst.expressions.Atan | atan | SELECT atan(0) | struct<ATAN(CAST(0 AS DOUBLE)):double> |
3939
| org.apache.spark.sql.catalyst.expressions.Atan2 | atan2 | SELECT atan2(0, 0) | struct<ATAN2(CAST(0 AS DOUBLE), CAST(0 AS DOUBLE)):double> |
4040
| org.apache.spark.sql.catalyst.expressions.Atanh | atanh | SELECT atanh(0) | struct<ATANH(CAST(0 AS DOUBLE)):double> |

sql/core/src/test/resources/sql-tests/results/ansi/literals.sql.out

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
-- !query
66
select null, Null, nUll
77
-- !query schema
8-
struct<NULL:null,NULL:null,NULL:null>
8+
struct<NULL:unknown,NULL:unknown,NULL:unknown>
99
-- !query output
1010
NULL NULL NULL
1111

0 commit comments

Comments
 (0)