Skip to content

Commit

Permalink
Issue 366: Ensure TBLPROPERTIES consistency between _delta_log, catal…
Browse files Browse the repository at this point in the history
…og, and Qbeast internals (Qbeast-io#367)
  • Loading branch information
osopardo1 authored Jul 30, 2024
1 parent bab1508 commit 71ee262
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 10 deletions.
12 changes: 12 additions & 0 deletions src/main/scala/io/qbeast/core/model/QbeastSnapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ trait QbeastSnapshot {
*/
def isInitial: Boolean

/**
* The current table description.
* @return
*/
def loadDescription: String

/**
* The current table properties of the snapshot.
* @return
*/
def loadProperties: Map[String, String]

/**
* Load methods
*/
Expand Down
17 changes: 17 additions & 0 deletions src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,22 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot)

private val metadataMap: Map[String, String] = snapshot.metadata.configuration

/**
* The current table properties of the snapshot.
*
* We filter out the revision, leaving only Revision ID.
*
* @return
*/
override def loadProperties: Map[String, String] =
snapshot.getProperties.filterKeys(k => !k.startsWith(MetadataConfig.revision)).toMap

/**
* The current table description.
* @return
*/
override def loadDescription: String = snapshot.metadata.description

/**
* Constructs revision dictionary
*
Expand Down Expand Up @@ -195,4 +211,5 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot)
* Loads Staging AddFiles
*/
def loadStagingFiles(): Dataset[AddFile] = stagingFiles()

}
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,11 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatal
ifExists = true).run(spark)

// Other cases not handled yet
case _ => return getSessionCatalog().alterTable(ident, changes: _*)
case _ =>
}

// Update session catalog with changes
getSessionCatalog().alterTable(ident, changes: _*)
loadTable(ident)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,20 @@
*/
package io.qbeast.spark.internal.sources.v2

import io.qbeast.context.QbeastContext
import io.qbeast.core.model.QTableID
import io.qbeast.spark.internal.sources.QbeastBaseRelation
import io.qbeast.spark.table.IndexedTableFactory
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.connector.catalog.SupportsWrite
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.catalog.TableCapability
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.connector.write.LogicalWriteInfo
import org.apache.spark.sql.connector.write.WriteBuilder
import org.apache.spark.sql.sources.BaseRelation
Expand All @@ -33,6 +37,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.V2toV1Fallback

import java.util
import scala.collection.mutable
import scala.collection.JavaConverters._

/**
Expand Down Expand Up @@ -65,11 +70,15 @@ case class QbeastTableImpl(

private val indexedTable = tableFactory.getIndexedTable(tableId)

private lazy val spark = SparkSession.active

private lazy val initialSnapshot = QbeastContext.metadataManager.loadSnapshot(tableId)

private lazy val table: CatalogTable =
if (catalogTable.isDefined) catalogTable.get
else {
// Get table Metadata if no catalog table is provided
SparkSession.active.sessionState.catalog
spark.sessionState.catalog
.getTableMetadata(tableIdentifier)
}

Expand All @@ -90,7 +99,30 @@ case class QbeastTableImpl(
QbeastBaseRelation.forQbeastTableWithOptions(indexedTable, properties().asScala.toMap)
}

override def properties(): util.Map[String, String] = options.asJava
override def properties(): util.Map[String, String] = {
val description = initialSnapshot.loadDescription
val base = mutable.Map() ++ initialSnapshot.loadProperties
options.foreach {
case (key, value) if !base.contains(key) =>
base.put(key, value)
case _ => // do nothing
}
base.put(TableCatalog.PROP_PROVIDER, "qbeast")
base.put(TableCatalog.PROP_LOCATION, CatalogUtils.URIToString(path.toUri))
catalogTable.foreach { table =>
if (table.owner != null && table.owner.nonEmpty) {
base.put(TableCatalog.PROP_OWNER, table.owner)
}
v1Table.storage.properties.foreach { case (key, value) =>
base.put(TableCatalog.OPTION_PREFIX + key, value)
}
if (v1Table.tableType == CatalogTableType.EXTERNAL) {
base.put(TableCatalog.PROP_EXTERNAL, "true")
}
}
Option(description).foreach(base.put(TableCatalog.PROP_COMMENT, _))
base.asJava
}

override def v1Table: CatalogTable = table

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ class QbeastTableImplTest extends QbeastIntegrationTestSpec with CatalogTestSuit
None,
indexedTableFactory)

qbeastTableImpl.properties() shouldBe properties.asJava
val qbeastTableProperties = qbeastTableImpl.properties()

qbeastTableProperties.get("provider") shouldBe "qbeast"
qbeastTableProperties.get("columnsToIndex") shouldBe "id"

})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.qbeast.spark.internal.sources.catalog

import io.qbeast.spark.QbeastIntegrationTestSpec
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.AnalysisException
import org.apache.spark.SparkConf
Expand Down Expand Up @@ -238,13 +239,11 @@ class QbeastCatalogIntegrationTest extends QbeastIntegrationTestSpec with Catalo
it should "persist altered properties on the _delta_log" in withQbeastContextSparkAndTmpWarehouse(
(spark, tmpDir) => {

val df = loadTestData(spark)
df.write.format("qbeast").option("columnsToIndex", "user_id,price").save(tmpDir)
spark.sql(s"CREATE TABLE t1 USING qbeast LOCATION '$tmpDir'")
spark.sql("CREATE TABLE t1(id INT) USING qbeast TBLPROPERTIES ('columnsToIndex'= 'id')")
spark.sql("ALTER TABLE t1 SET TBLPROPERTIES ('k' = 'v')")

// Check the delta log info
val deltaLog = DeltaLog.forTable(spark, tmpDir)
val deltaLog = DeltaLog.forTable(spark, TableIdentifier("t1"))
val snapshot = deltaLog.update()
val properties = snapshot.getProperties

Expand All @@ -253,4 +252,65 @@ class QbeastCatalogIntegrationTest extends QbeastIntegrationTestSpec with Catalo

})

it should "persist UNSET properties in _delta_catalog" in withQbeastContextSparkAndTmpWarehouse(
(spark, _) => {

spark.sql(
"CREATE TABLE t1(id INT) " +
"USING qbeast " +
"TBLPROPERTIES ('columnsToIndex'= 'id')")

spark.sql("ALTER TABLE t1 SET TBLPROPERTIES ('k' = 'v')")

val deltaLog = DeltaLog.forTable(spark, TableIdentifier("t1"))
val snapshot = deltaLog.update()
val properties = snapshot.getProperties

properties should contain key "k"
properties("k") shouldBe "v"

spark.sql("ALTER TABLE t1 UNSET TBLPROPERTIES ('k')")

// Check the delta log info
val updatedProperties = deltaLog.update().getProperties
updatedProperties should not contain key("k")
})

it should "ensure consistency with the session catalog" in withQbeastContextSparkAndTmpWarehouse(
(spark, tmpDir) => {

import spark.implicits._

spark.sql(
"CREATE TABLE t1(id INT) " +
"USING qbeast " +
"TBLPROPERTIES ('columnsToIndex'= 'id')")
spark.sql("ALTER TABLE t1 SET TBLPROPERTIES ('k' = 'v')")
// Check the delta log info
val deltaLog = DeltaLog.forTable(spark, TableIdentifier("t1"))
val catalog = spark.sessionState.catalog
val showProperties = spark.sql("SHOW TBLPROPERTIES t1").as[(String, String)].collect().toMap

val snapshot = deltaLog.update()
val properties = snapshot.getProperties
val catalogProperties =
catalog.getTableMetadata(TableIdentifier("t1")).properties
properties should contain key "k"
catalogProperties should contain key ("k")
showProperties should contain key ("k")

spark.sql("ALTER TABLE t1 UNSET TBLPROPERTIES ('k')")
// Check the delta log info
val updatedSnapshot = deltaLog.update()
val updatedProperties = updatedSnapshot.getProperties
val updatedCatalogProperties =
catalog.getTableMetadata(TableIdentifier("t1")).properties
val updatedShowProperties =
spark.sql("SHOW TBLPROPERTIES t1").as[(String, String)].collect().toMap

updatedProperties should not contain key("k")
updatedCatalogProperties should not contain key("k")
updatedShowProperties should not contain key("k")
})

}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,25 @@ class QbeastCatalogTest extends QbeastIntegrationTestSpec with CatalogTestSuite
.asScala should contain("newProperty" -> "newValue")
})

it should "unset properties" in withQbeastContextSparkAndTmpWarehouse((spark, _) => {
val qbeastCatalog = createQbeastCatalog(spark)
val tableIdentifier = Identifier.of(defaultNamespace, "student")
qbeastCatalog.createTable(
tableIdentifier,
columns,
Array.empty[Transform],
Map("newProperty" -> "newValue").asJava)

val unsetPropertiesChange = TableChange.removeProperty("newProperty")
// Alter table with new information
qbeastCatalog.alterTable(tableIdentifier, unsetPropertiesChange)

qbeastCatalog
.loadTable(Identifier.of(defaultNamespace, "student"))
.properties()
.asScala should not contain ("newProperty" -> "newValue")
})

it should "drop table" in withQbeastContextSparkAndTmpWarehouse((spark, _) => {
val qbeastCatalog = createQbeastCatalog(spark)
val tableIdentifier = Identifier.of(defaultNamespace, "student")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,13 @@ class QbeastSQLIntegrationTest extends QbeastIntegrationTestSpec {
.first()
.getString(0) shouldBe "qbeast"
// Check Table Properties
table
val tableProperties = table
.where("col_name == 'Table Properties'")
.select("data_type")
.first()
.getString(0) shouldBe "[columnsToIndex=id,option.columnsToIndex=id]"
.getString(0)
tableProperties should include("columnsToIndex=id")
tableProperties should include("option.columnsToIndex=id")

})

Expand Down

0 comments on commit 71ee262

Please sign in to comment.