Skip to content

Commit

Permalink
[SC-69842] Migrating Delta to use Spark 3.1.1
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
 - Migrate to use Spark 3.1.1 adding tests and refactors

## How was this patch tested?
 - Existing tests

Author: Pranav Anand <anandpranavv@gmail.com>

#19552 is resolved by pranavanand/pa-delta311migration.

GitOrigin-RevId: fd3b86468f07752fbd3d56f653aec683af05d0b4
  • Loading branch information
pranavanand authored and mengtong-db committed Apr 6, 2021
1 parent 73ade72 commit 22e4739
Show file tree
Hide file tree
Showing 21 changed files with 518 additions and 185 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
pyenv install 3.7.4
pyenv global system 3.7.4
pipenv --python 3.7 install
pipenv run pip install pyspark==3.0.1
pipenv run pip install pyspark==3.1.1
pipenv run pip install flake8==3.5.0 pypandoc==1.3.3
- run:
name: Run Scala/Java and Python tests
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM python:3.7.3-stretch

RUN apt-get update && apt-get -y install openjdk-8-jdk

RUN pip install pyspark==3.0.1
RUN pip install pyspark==3.1.1

COPY . /usr/src/delta

Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ organization := "io.delta"

scalaVersion := "2.12.10"

sparkVersion := "3.0.1"
sparkVersion := "3.1.1"

libraryDependencies ++= Seq(
// Adding test classifier seems to break transitive resolution of the core dependencies
Expand All @@ -30,7 +30,7 @@ libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided",

// Test deps
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"org.scalatest" %% "scalatest" % "3.1.0" % "test",
"junit" % "junit" % "4.12" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests",
Expand Down
11 changes: 1 addition & 10 deletions src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ class DeltaSqlParser(val delegate: ParserInterface) extends ParserInterface {
override def parseTableSchema(sqlText: String): StructType = delegate.parseTableSchema(sqlText)

override def parseDataType(sqlText: String): DataType = delegate.parseDataType(sqlText)

override def parseRawDataType(sqlText: String): DataType = delegate.parseRawDataType(sqlText)
}

/**
Expand Down Expand Up @@ -208,16 +206,9 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {

val builder = new MetadataBuilder

// Add Hive type string to metadata.
val rawDataType = typedVisit[DataType](ctx.dataType)
val cleanedDataType = HiveStringType.replaceCharType(rawDataType)
if (rawDataType != cleanedDataType) {
builder.putString(HIVE_TYPE_STRING, rawDataType.catalogString)
}

StructField(
ctx.colName.getText,
cleanedDataType,
typedVisit[DataType](ctx.dataType),
nullable = NOT == null,
builder.build())
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ class DeltaAnalysis(session: SparkSession, conf: SQLConf)

private def stripTempViewWrapper(plan: LogicalPlan): LogicalPlan = {
plan
.transformUp {
case v: View if v.isTempView => v.child
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, V2WriteCommand}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, DropTable, LogicalPlan, OverwriteByExpression, V2WriteCommand}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation

Expand Down Expand Up @@ -107,6 +107,11 @@ case class DeltaUnsupportedOperationsCheck(spark: SparkSession)
case overwrite: OverwriteByExpression =>
checkDeltaTableExists(overwrite, "OVERWRITE")

case _: DropTable =>
// For Delta tables being dropped, we do not need the underlying Delta log to exist so this is
// OK
return

case DataSourceV2Relation(tbl: DeltaTableV2, _, _, _, _) if !tbl.deltaLog.tableExists =>
throw DeltaErrors.pathNotExistsException(tbl.deltaLog.dataPath.toString)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.internal.SQLConf
/**
* Preprocess the [[DeltaDelete]] plan to convert to [[DeleteCommand]].
*/
case class PreprocessTableDelete(conf: SQLConf) extends Rule[LogicalPlan] {
case class PreprocessTableDelete(sqlConf: SQLConf) extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = {
plan.resolveOperators {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StructField, StructType}

case class PreprocessTableMerge(conf: SQLConf)
case class PreprocessTableMerge(override val conf: SQLConf)
extends Rule[LogicalPlan] with UpdateExpressionsSupport {

override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ import org.apache.spark.sql.internal.SQLConf
* into account that the specified SET clause may only update some columns or nested fields of
* columns.
*/
case class PreprocessTableUpdate(conf: SQLConf)
case class PreprocessTableUpdate(sqlConf: SQLConf)
extends Rule[LogicalPlan] with UpdateExpressionsSupport {

override def conf: SQLConf = sqlConf

override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
case u: DeltaUpdateTable if u.resolved =>
u.condition.foreach { cond =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,16 @@ package org.apache.spark.sql.delta
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.{CastSupport, Resolver}
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, CreateNamedStruct, Expression, ExtractValue, GetStructField, Literal, NamedExpression}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

/**
* Trait with helper functions to generate expressions to update target columns, even if they are
* nested fields.
*/
trait UpdateExpressionsSupport extends CastSupport {

def conf: SQLConf

trait UpdateExpressionsSupport extends CastSupport with SQLConfHelper {
/**
* Specifies an operation that updates a target column with the given expression.
* The target column may or may not be a nested field and it is specified as a full quoted name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, TableCatalog}
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table, TableCapability, TableCatalog, V2TableWithV1Fallback}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.expressions._
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, SupportsOverwrite, SupportsTruncate, V1WriteBuilder, WriteBuilder}
Expand All @@ -55,6 +55,7 @@ case class DeltaTableV2(
options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty())
extends Table
with SupportsWrite
with V2TableWithV1Fallback
with DeltaLogging {

private lazy val (rootPath, partitionFilters, timeTravelByPath) = {
Expand Down Expand Up @@ -157,6 +158,12 @@ case class DeltaTableV2(
this
}
}

override def v1Table: CatalogTable = {
catalogTable.getOrElse {
throw new IllegalStateException("v1Table call is not expected with path based DeltaTableV2")
}
}
}

private class WriteIntoDeltaBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ case class CreateDeltaTableCommand(
// overwrite any previous data)
if (fs.exists(path) && fs.listStatus(path).nonEmpty) {
throw new AnalysisException(s"Cannot create table ('${tableWithLocation.identifier}')." +
s" The associated location ('${tableWithLocation.location}') is not empty.")
s" The associated location ('${tableWithLocation.location}') is not empty but " +
s"it's not a Delta table")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class DeltaDataSource
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): (String, StructType) = {
if (schema.nonEmpty) {
if (schema.nonEmpty && schema.get.nonEmpty) {
throw DeltaErrors.specifySchemaAtReadTimeException
}
val path = parameters.getOrElse("path", {
Expand All @@ -83,6 +83,9 @@ class DeltaDataSource
val (_, maybeTimeTravel) = DeltaTableUtils.extractIfPathContainsTimeTravel(
sqlContext.sparkSession, path)
if (maybeTimeTravel.isDefined) throw DeltaErrors.timeTravelNotSupportedException
if (DeltaDataSource.getTimeTravelVersion(parameters).isDefined) {
throw DeltaErrors.timeTravelNotSupportedException
}

val deltaLog = DeltaLog.forTable(sqlContext.sparkSession, path)
if (deltaLog.snapshot.schema.isEmpty) {
Expand All @@ -99,7 +102,7 @@ class DeltaDataSource
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source = {
if (schema.nonEmpty) {
if (schema.nonEmpty && schema.get.nonEmpty) {
throw DeltaErrors.specifySchemaAtReadTimeException
}
val path = parameters.getOrElse("path", {
Expand Down
10 changes: 0 additions & 10 deletions src/test/scala/org/apache/spark/sql/delta/DeleteSQLSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,4 @@ class DeleteSQLSuite extends DeleteSuiteBase with DeltaSQLCommandTest {
}
}

test("delete from a SQL temp view") {
withTable("tab") {
withTempView("v") {
Seq((1, 1), (0, 3), (1, 5)).toDF("key", "value").write.format("delta").saveAsTable("tab")
sql("CREATE TEMP VIEW v AS SELECT * FROM tab")
sql("DELETE FROM v WHERE key = 1 AND VALUE = 5")
checkAnswer(spark.table("tab"), Seq(Row(1, 1), Row(0, 3)))
}
}
}
}
28 changes: 0 additions & 28 deletions src/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -395,34 +395,6 @@ abstract class DeltaDDLTestBase extends QueryTest with SQLTestUtils {
}
}

test("SHOW CREATE TABLE should not include OPTIONS except for path") {
withTable("delta_test") {
sql(s"""
|CREATE TABLE delta_test(a LONG, b String)
|USING delta
""".stripMargin)

val statement = sql("SHOW CREATE TABLE delta_test").collect()(0).getString(0)
assert(!statement.contains("OPTION"))
}

withTempDir { dir =>
withTable("delta_test") {
val path = dir.getCanonicalPath()
sql(s"""
|CREATE TABLE delta_test(a LONG, b String)
|USING delta
|LOCATION '$path'
""".stripMargin)

val statement = sql("SHOW CREATE TABLE delta_test").collect()(0).getString(0)
assert(statement.contains(
s"LOCATION '${CatalogUtils.URIToString(makeQualifiedPath(path))}'"))
assert(!statement.contains("OPTION"))
}
}
}

test("DESCRIBE TABLE for partitioned table") {
withTempDir { dir =>
withTable("delta_test") {
Expand Down
Loading

0 comments on commit 22e4739

Please sign in to comment.