Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK][TEST-ONLY] Use unique table name for identity column tests #3594

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
rename table
  • Loading branch information
zhipengmao-db committed Aug 22, 2024
commit cd7656d9ac0c4018259154736ab65dbeb93bcef8
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ trait IdentityColumnAdmissionSuiteBase

import testImplicits._

protected val tblName = "identity_admission_test"

protected override def sparkConf: SparkConf = {
super.sparkConf
.set(DeltaSQLConf.DELTA_IDENTITY_COLUMN_ENABLED.key, "true")
Expand All @@ -52,6 +50,7 @@ trait IdentityColumnAdmissionSuiteBase
keyword <- Seq("ALTER", "CHANGE")
targetType <- Seq(IntegerType, DoubleType)
} {
val tblName = getRandomTableName
withIdentityColumnTable(generatedAsIdentityType, tblName) {
targetType match {
case IntegerType =>
Expand All @@ -78,6 +77,7 @@ trait IdentityColumnAdmissionSuiteBase
generatedAsIdentityType <- GeneratedAsIdentityType.values
keyword <- Seq("ALTER", "CHANGE")
} {
val tblName = getRandomTableName
withIdentityColumnTable(generatedAsIdentityType, tblName) {
sql(s"ALTER TABLE $tblName $keyword COLUMN id COMMENT 'comment'")
}
Expand All @@ -88,6 +88,7 @@ trait IdentityColumnAdmissionSuiteBase
for {
generatedAsIdentityType <- GeneratedAsIdentityType.values
} {
val tblName = getRandomTableName
withIdentityColumnTable(generatedAsIdentityType, tblName) {
sql(s"ALTER TABLE $tblName SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name')")
sql(s"INSERT INTO $tblName (value) VALUES (1)")
Expand All @@ -102,9 +103,8 @@ trait IdentityColumnAdmissionSuiteBase
}

test("cannot set default value for identity column") {
for {
generatedAsIdentityType <- GeneratedAsIdentityType.values
} {
for (generatedAsIdentityType <- GeneratedAsIdentityType.values) {
val tblName = getRandomTableName
withIdentityColumnTable(generatedAsIdentityType, tblName) {
val ex = intercept[DeltaAnalysisException] {
sql(s"ALTER TABLE $tblName ALTER COLUMN id SET DEFAULT 1")
Expand All @@ -115,9 +115,8 @@ trait IdentityColumnAdmissionSuiteBase
}

test("position of identity column can be moved") {
for {
generatedAsIdentityType <- GeneratedAsIdentityType.values
} {
for (generatedAsIdentityType <- GeneratedAsIdentityType.values) {
val tblName = getRandomTableName
withIdentityColumnTable(generatedAsIdentityType, tblName) {
sql(s"ALTER TABLE $tblName ALTER COLUMN id AFTER value")
sql(s"INSERT INTO $tblName (value) VALUES (1)")
Expand All @@ -133,6 +132,7 @@ trait IdentityColumnAdmissionSuiteBase

test("alter table replace columns") {
for (generatedAsIdentityType <- GeneratedAsIdentityType.values) {
val tblName = getRandomTableName
withIdentityColumnTable(generatedAsIdentityType, tblName) {
val ex = intercept[DeltaAnalysisException] {
sql(s"ALTER TABLE $tblName REPLACE COLUMNS (id BIGINT, value INT)")
Expand All @@ -143,9 +143,8 @@ trait IdentityColumnAdmissionSuiteBase
}

test("create table partitioned by identity column") {
for {
generatedAsIdentityType <- GeneratedAsIdentityType.values
} {
for (generatedAsIdentityType <- GeneratedAsIdentityType.values) {
val tblName = getRandomTableName
withTable(tblName) {
val ex1 = intercept[DeltaAnalysisException] {
createTable(
Expand Down Expand Up @@ -176,9 +175,8 @@ trait IdentityColumnAdmissionSuiteBase
}

test("replace with table partitioned by identity column") {
for {
generatedAsIdentityType <- GeneratedAsIdentityType.values
} {
for (generatedAsIdentityType <- GeneratedAsIdentityType.values) {
val tblName = getRandomTableName
withTable(tblName) {
// First create a table with no identity column and no partitions.
createTable(
Expand Down Expand Up @@ -220,10 +218,9 @@ trait IdentityColumnAdmissionSuiteBase
}

test("CTAS does not inherit IDENTITY column") {
for {
generatedAsIdentityType <- GeneratedAsIdentityType.values
} {
val ctasTblName = "ctasTblName"
for (generatedAsIdentityType <- GeneratedAsIdentityType.values) {
val tblName = getRandomTableName
val ctasTblName = getRandomTableName
withIdentityColumnTable(generatedAsIdentityType, tblName) {
withTable(ctasTblName) {
sql(s"INSERT INTO $tblName (value) VALUES (1), (2)")
Expand All @@ -239,6 +236,7 @@ trait IdentityColumnAdmissionSuiteBase
}

test("insert generated always as") {
val tblName = getRandomTableName
withIdentityColumnTable(GeneratedAlways, tblName) {
// Test SQLs.
val blockedStmts = Seq(
Expand Down Expand Up @@ -267,6 +265,7 @@ trait IdentityColumnAdmissionSuiteBase
}

test("streaming") {
val tblName = getRandomTableName
withIdentityColumnTable(GeneratedAlways, tblName) {
val path = DeltaLog.forTable(spark, TableIdentifier(tblName)).dataPath.toString
withTempDir { checkpointDir =>
Expand All @@ -293,6 +292,7 @@ trait IdentityColumnAdmissionSuiteBase

test("update") {
for (generatedAsIdentityType <- GeneratedAsIdentityType.values) {
val tblName = getRandomTableName
withIdentityColumnTable(generatedAsIdentityType, tblName) {
sql(s"INSERT INTO $tblName (value) VALUES (1), (2)")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,10 @@ trait IdentityColumnConflictSuiteBase
override def sparkConf: SparkConf = super.sparkConf
.set(DeltaSQLConf.DELTA_ROW_TRACKING_BACKFILL_ENABLED.key, "true")

val tblName = "identity_conflict_test"
val colName = "id"

private def setupEmptyTableWithRowTrackingTableFeature(
tblIsoLevel: Option[IsolationLevel]): Unit = {
tblIsoLevel: Option[IsolationLevel], tblName: String): Unit = {
val tblPropertiesMap: Map[String, String] = Map(
TableFeatureProtocolUtils.propertyKey(RowTrackingFeature) -> "supported",
DeltaConfigs.ROW_TRACKING_ENABLED.key -> "false",
Expand Down Expand Up @@ -142,16 +141,17 @@ trait IdentityColumnConflictSuiteBase
currentTxn: TransactionConflictTestCase,
winningTxn: TransactionConflictTestCase,
tblIsoLevel: Option[IsolationLevel]): Unit = {
val tblName = getRandomTableName
withTable(tblName) {
// We start with an empty table that has row tracking table feature support and row tracking
// table property disabled. This way, when we set the table property to true, it will not
// also do a protocol upgrade and we don't need any backfill commit.
setupEmptyTableWithRowTrackingTableFeature(tblIsoLevel)
setupEmptyTableWithRowTrackingTableFeature(tblIsoLevel, tblName)

val threadPool =
ThreadUtils.newDaemonSingleThreadExecutor(threadName = "identity-column-thread-pool")
var (txnObserver, future) = runQueryWithObserver(
name = "current", threadPool, currentTxn.sqlCommand)
name = "current", threadPool, currentTxn.sqlCommand.format(tblName))

// If the current txn is enabling row tracking on an existing table, the first txn is
// a NOOP since there are no files in the table initially. No commit will be made.
Expand All @@ -169,7 +169,7 @@ trait IdentityColumnConflictSuiteBase
unblockUntilPreCommit(txnObserver)
busyWaitFor(txnObserver.phases.preparePhase.hasEntered, timeout)

sql(winningTxn.sqlCommand)
sql(winningTxn.sqlCommand.format(tblName))

val expectedException = expectedExceptionClass(currentTxn, winningTxn)
val events = Log4jUsageLogger.track {
Expand Down Expand Up @@ -211,29 +211,29 @@ trait IdentityColumnConflictSuiteBase
// System generated IDENTITY value will have a metadata update for IDENTITY high water marks.
private val generatedIdTestCase = IdentityOnlyMetadataUpdateTestCase(
name = "generatedId",
sqlCommand = s"INSERT INTO $tblName(value) VALUES (1)",
sqlCommand = s"INSERT INTO %s(value) VALUES (1)",
isAppend = true
)

// SYNC IDENTITY updates the high water mark based on the values in the IDENTITY column.
private val syncIdentityTestCase = IdentityOnlyMetadataUpdateTestCase(
name = "syncIdentity",
sqlCommand = s"ALTER TABLE $tblName ALTER COLUMN $colName SYNC IDENTITY",
sqlCommand = s"ALTER TABLE %s ALTER COLUMN $colName SYNC IDENTITY",
isAppend = false
)

// Explicitly provided IDENTITY value will not generate a metadata update.
private val noMetadataUpdateTestCase =
NoMetadataUpdateTestCase(
name = "noMetadataUpdate",
sqlCommand = s"INSERT INTO $tblName VALUES (1, 1)",
sqlCommand = s"INSERT INTO %s VALUES (1, 1)",
isAppend = true
)

private val rowTrackingEnablementTestCase = RowTrackingEnablementOnlyTestCase(
name = "rowTrackingEnablement",
sqlCommand =
s"""ALTER TABLE $tblName
s"""ALTER TABLE %s
|SET TBLPROPERTIES(
|'${DeltaConfigs.ROW_TRACKING_ENABLED.key}' = 'true'
|)""".stripMargin,
Expand All @@ -242,7 +242,7 @@ trait IdentityColumnConflictSuiteBase

private val otherMetadataUpdateTestCase = GenericMetadataUpdateTestCase(
name = "otherMetadataUpdate",
sqlCommand = s"ALTER TABLE $tblName ADD COLUMN value2 STRING",
sqlCommand = s"ALTER TABLE %s ADD COLUMN value2 STRING",
isAppend = false
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ trait IdentityColumnIngestionSuiteBase extends IdentityColumnTestUtils {

import testImplicits._

private val tblName = "identity_test"
private val tempTblName = "identity_test_temp"
private val tempCsvFileName = "test.csv"

/** Helper function to write a single 'value' column into `sourcePath`. */
Expand Down Expand Up @@ -93,6 +91,7 @@ trait IdentityColumnIngestionSuiteBase extends IdentityColumnTestUtils {
batchSize: Int,
mode: IngestMode.Value): Unit = {
var highWaterMark = start - step
val tblName = getRandomTableName
withTable(tblName) {
createTableWithIdColAndIntValueCol(
tblName, GeneratedAlways, startsWith = Some(start), incrementBy = Some(step))
Expand All @@ -105,6 +104,7 @@ trait IdentityColumnIngestionSuiteBase extends IdentityColumnTestUtils {
val df = (batchStart to batchEnd).toDF("value")
// Used by insertInto, insertIntoSelect, insertOverwrite, insertOverwriteSelect
val insertValues = (batchStart to batchEnd).map(v => s"($v)").mkString(",")
val tempTblName = s"${getRandomTableName}_temp"

mode match {
case IngestMode.appendV1 =>
Expand Down Expand Up @@ -258,6 +258,7 @@ trait IdentityColumnIngestionSuiteBase extends IdentityColumnTestUtils {
}

test("explicit insert should not update high water mark") {
val tblName = getRandomTableName
withIdentityColumnTable(GeneratedByDefault, tblName) {
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tblName))
val schema1 = deltaLog.snapshot.metadata.schemaString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ import org.apache.spark.sql.types._
trait IdentityColumnSuiteBase extends IdentityColumnTestUtils {

import testImplicits._
protected val tblName = "identity_test"

test("Don't allow IDENTITY column in the schema if the feature is disabled") {
val tblName = getRandomTableName
withSQLConf(DeltaSQLConf.DELTA_IDENTITY_COLUMN_ENABLED.key -> "false") {
withTable(tblName) {
val e = intercept[DeltaUnsupportedTableFeatureException] {
Expand Down Expand Up @@ -102,6 +103,7 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils {
startsWith <- starts
incrementBy <- steps
} {
val tblName = getRandomTableName
withTable(tblName) {
createTableWithIdColAndIntValueCol(
tblName, generatedAsIdentityType, Some(startsWith), Some(incrementBy))
Expand All @@ -119,6 +121,7 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils {
startsWith <- Seq(Some(1L), None)
incrementBy <- Seq(Some(1L), None)
} {
val tblName = getRandomTableName
withTable(tblName) {
createTableWithIdColAndIntValueCol(
tblName, generatedAsIdentityType, startsWith, incrementBy)
Expand All @@ -131,6 +134,7 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils {
}

test("logging") {
val tblName = getRandomTableName
withTable(tblName) {
val eventsDefinition = Log4jUsageLogger.track {
createTable(
Expand Down Expand Up @@ -184,6 +188,7 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils {
assert(!ColumnWithDefaultExprUtils.hasIdentityColumn(f().schema), s"test $id failed")
}

val tblName = getRandomTableName
withTable(tblName) {
createTable(
tblName,
Expand Down Expand Up @@ -312,6 +317,8 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils {
test(
"replace table with identity column should upgrade protocol, "
+ s"identityType: $generatedAsIdentityType") {

val tblName = getRandomTableName
def getProtocolVersions: (Int, Int) = {
sql(s"DESC DETAIL $tblName")
.select("minReaderVersion", "minWriterVersion")
Expand Down Expand Up @@ -354,6 +361,7 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils {
start <- starts
step <- steps
} {
val tblName = getRandomTableName
withTable(tblName) {
createTableWithIdColAndIntValueCol(
tblName, generatedAsIdentityType, Some(start), Some(step))
Expand All @@ -380,7 +388,7 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils {
}

test("restore - positive step") {
val tableName = "identity_test_tgt"
val tableName = getRandomTableName
withTable(tableName) {
generateTableWithIdentityColumn(tableName)
sql(s"RESTORE TABLE $tableName TO VERSION AS OF 3")
Expand All @@ -393,7 +401,7 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils {
}

test("restore - negative step") {
val tableName = "identity_test_tgt"
val tableName = getRandomTableName
withTable(tableName) {
generateTableWithIdentityColumn(tableName, step = -1)
sql(s"RESTORE TABLE $tableName TO VERSION AS OF 3")
Expand All @@ -407,6 +415,7 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils {

test("restore - on partitioned table") {
for (generatedAsIdentityType <- GeneratedAsIdentityType.values) {
val tblName = getRandomTableName
withTable(tblName) {
// v0.
createTable(
Expand Down Expand Up @@ -443,11 +452,11 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils {
}

test("clone") {
val oldTbl = "identity_test_old"
val newTbl = "identity_test_new"
for {
generatedAsIdentityType <- GeneratedAsIdentityType.values
} {
val oldTbl = s"${getRandomTableName}_old"
val newTbl = s"${getRandomTableName}_new"
withIdentityColumnTable(generatedAsIdentityType, oldTbl) {
withTable(newTbl) {
sql(s"INSERT INTO $oldTbl (value) VALUES (1), (2)")
Expand Down Expand Up @@ -479,8 +488,8 @@ class IdentityColumnScalaSuite
with ScalaDDLTestUtils {

test("unsupported column type") {
val tblName = "identity_test"
for (unsupportedType <- unsupportedDataTypes) {
val tblName = getRandomTableName
withTable(tblName) {
val ex = intercept[DeltaUnsupportedOperationException] {
createTable(
Expand All @@ -498,11 +507,11 @@ class IdentityColumnScalaSuite
}

test("unsupported step") {
val tblName = "identity_test"
for {
generatedAsIdentityType <- GeneratedAsIdentityType.values
startsWith <- Seq(Some(1L), None)
} {
val tblName = getRandomTableName
withTable(tblName) {
val ex = intercept[DeltaAnalysisException] {
createTableWithIdColAndIntValueCol(
Expand Down
Loading
Loading