Skip to content

Commit

Permalink
Change method name, aggregate code, add verifyAndMergeProperties name
Browse files Browse the repository at this point in the history
  • Loading branch information
osopardo1 authored and osopardo1 committed Dec 12, 2023
1 parent 7be92aa commit cd03beb
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ object QbeastBaseRelation {
*/
def createRelation(
sqlContext: SQLContext,
table: IndexedTable,
indexedTable: IndexedTable,
options: Map[String, String]): BaseRelation = {

val spark = SparkSession.active
val tableID = table.tableID
val tableID = indexedTable.tableID
val snapshot = QbeastContext.metadataManager.loadSnapshot(tableID)
val schema = QbeastContext.metadataManager.loadCurrentSchema(tableID)
if (snapshot.isInitial) {
Expand All @@ -52,22 +52,19 @@ object QbeastBaseRelation {
new ParquetFileFormat(),
options)(spark) with InsertableRelation {
def insert(data: DataFrame, overwrite: Boolean): Unit = {
table.save(data, options, append = !overwrite)
indexedTable.save(data, options, append = !overwrite)
}
}
} else {
// If the table contains data, initialize it
val revision = snapshot.loadLatestRevision
val columnsToIndex = revision.columnTransformers.map(row => row.columnName).mkString(",")
val cubeSize = revision.desiredCubeSize
val parameters =
Map[String, String]("columnsToIndex" -> columnsToIndex, "cubeSize" -> cubeSize.toString())

val path = new Path(tableID.id)
val fileIndex = OTreeIndex(spark, path)
val bucketSpec: Option[BucketSpec] = None
val file = new ParquetFileFormat()

// Verify and Merge options with existing indexed properties
val parameters = indexedTable.verifyAndMergeProperties(options)

new HadoopFsRelation(
fileIndex,
partitionSchema = StructType(Seq.empty[StructField]),
Expand All @@ -76,7 +73,7 @@ object QbeastBaseRelation {
file,
parameters)(spark) with InsertableRelation {
def insert(data: DataFrame, overwrite: Boolean): Unit = {
table.save(data, parameters, append = !overwrite)
indexedTable.save(data, parameters, append = !overwrite)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.qbeast.spark.internal.sources

import io.qbeast.context.QbeastContext
import io.qbeast.context.QbeastContext.metadataManager
import io.qbeast.spark.internal.QbeastOptions
import io.qbeast.spark.internal.sources.v2.QbeastTableImpl
import io.qbeast.spark.table.IndexedTableFactory
Expand Down Expand Up @@ -58,11 +57,7 @@ class QbeastDataSource private[sources] (private val tableFactory: IndexedTableF
val indexedTable = tableFactory.getIndexedTable(tableId)
if (indexedTable.exists) {
// If the table exists, we make sure to pass all the properties to QbeastTableImpl
val currentRevision = metadataManager.loadSnapshot(tableId).loadLatestRevision
val indexProperties = Map(
"columnsToIndex" -> currentRevision.columnTransformers.map(_.columnName).mkString(","),
"cubeSize" -> currentRevision.desiredCubeSize.toString)
val tableProperties = properties.asScala.toMap ++ indexProperties
val tableProperties = indexedTable.verifyAndMergeProperties(properties.asScala.toMap)
new QbeastTableImpl(
TableIdentifier(tableId.id),
new Path(tableId.id),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
*/
package io.qbeast.spark.internal.sources.catalog

import io.qbeast.context.QbeastContext.metadataManager
import io.qbeast.context.QbeastContext.{metadataManager}
import io.qbeast.core.model.QTableID
import io.qbeast.spark.internal.QbeastOptions._
import io.qbeast.spark.internal.sources.v2.QbeastTableImpl
import io.qbeast.spark.table.{IndexedTableFactory}
import io.qbeast.spark.table.IndexedTableFactory
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException
Expand All @@ -19,7 +18,6 @@ import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{
AnalysisException,
AnalysisExceptionFactory,
DataFrame,
SaveMode,
Expand Down Expand Up @@ -181,21 +179,7 @@ object QbeastCatalogUtils {

// Process the parameters/options/configuration sent to the table
val indexedTable = tableFactory.getIndexedTable(QTableID(loc.toString))
val isNewQbeastTable = (tableType == CatalogTableType.EXTERNAL) && !indexedTable.isConverted
if (indexedTable.exists && isNewQbeastTable) {
throw AnalysisExceptionFactory.create(
"The table you are trying to create already exists and is NOT Qbeast Formatted. " +
"Please use the ConvertToQbeastCommand before creating the table.")
}
val allProperties = {
try {
checkQbeastOptions(properties)
properties // No options added
} catch {
case _: AnalysisException => // Add existing table properties
properties ++ indexedTable.properties // Add the write options
}
}
val allProperties = indexedTable.verifyAndMergeProperties(properties)

// Initialize the path option
val storage = DataSource
Expand Down
111 changes: 55 additions & 56 deletions src/main/scala/io/qbeast/spark/table/IndexedTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.apache.spark.qbeast.config.DEFAULT_NUMBER_OF_RETRIES
import org.apache.spark.sql.delta.actions.FileAction
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisExceptionFactory, DataFrame}
import org.apache.spark.sql.{AnalysisException, AnalysisExceptionFactory, DataFrame}

import java.util.ConcurrentModificationException

Expand All @@ -31,10 +31,10 @@ trait IndexedTable {
def exists: Boolean

/**
* Returns whether the table is converted to Qbeast format.
* Returns whether the table contains Qbeast metadata
* @return
*/
def isConverted: Boolean
def containsQbeastMetadata: Boolean

/**
* Returns the table id which identifies the table.
Expand All @@ -44,11 +44,11 @@ trait IndexedTable {
def tableID: QTableID

/**
* Returns the table properties.
*
* @return the table properties
* Merge new and index current properties
* @param properties the properties you want to merge
* @return
*/
def properties: Map[String, String]
def verifyAndMergeProperties(properties: Map[String, String]): Map[String, String]

/**
* Saves given data in the table and updates the index. The specified columns are
Expand Down Expand Up @@ -152,31 +152,59 @@ private[table] class IndexedTableImpl(
with StagingUtils {
private var snapshotCache: Option[QbeastSnapshot] = None

/**
* Latest Revision Available
*
* @return
*/
private def latestRevision: Revision = snapshot.loadLatestRevision

override def exists: Boolean = !snapshot.isInitial

override def isConverted: Boolean = try {
override def containsQbeastMetadata: Boolean = try {
snapshot.loadLatestRevision
true
} catch {
case _: Exception => false
case _: AnalysisException => false
}

override def properties: Map[String, String] = {
if (exists && isConverted) {
val latestRevision = snapshot.loadLatestRevision
Map(
COLUMNS_TO_INDEX -> latestRevision.columnTransformers
.map(_.columnName)
.mkString(","),
CUBE_SIZE -> latestRevision.desiredCubeSize.toString)
override def verifyAndMergeProperties(properties: Map[String, String]): Map[String, String] = {
if (!exists) { // IF not exists, we should only check new properties
checkQbeastOptions(properties)
properties
} else if (containsQbeastMetadata) { // If exists, we can merge both properties: new and current
val currentColumnsIndexed =
latestRevision.columnTransformers.map(_.columnName).mkString(",")
val currentCubeSize = latestRevision.desiredCubeSize.toString
val finalProperties = {
(properties.contains(COLUMNS_TO_INDEX), properties.contains(CUBE_SIZE)) match {
case (true, true) => properties
case (false, false) =>
properties + (COLUMNS_TO_INDEX -> currentColumnsIndexed, CUBE_SIZE -> currentCubeSize)
case (true, false) => properties + (CUBE_SIZE -> currentCubeSize)
case (false, true) =>
properties + (COLUMNS_TO_INDEX -> currentColumnsIndexed)
}
}
finalProperties
} else {
Map.empty
throw AnalysisExceptionFactory.create(
s"Table ${tableID.id} exists but does not contain Qbeast metadata. " +
s"Please use ConvertToQbeastCommand to convert the table to Qbeast.")
}
}

private def isNewRevision(qbeastOptions: QbeastOptions, latestRevision: Revision): Boolean = {
private def isNewRevision(qbeastOptions: QbeastOptions): Boolean = {

// TODO feature: columnsToIndex may change between revisions
checkColumnsToMatchSchema(latestRevision)
val columnsToIndex = qbeastOptions.columnsToIndex
val currentColumnsToIndex = latestRevision.columnTransformers.map(_.columnName)
val isNewColumns = !latestRevision.matchColumns(columnsToIndex)
if (isNewColumns) {
throw AnalysisExceptionFactory.create(
s"Columns to index '${columnsToIndex.mkString(",")}' do not match " +
s"existing index ${currentColumnsToIndex.mkString(",")}.")
}
// Checks if the desiredCubeSize is different from the existing one
val isNewCubeSize = latestRevision.desiredCubeSize != qbeastOptions.cubeSize
// Checks if the user-provided column boundaries would trigger the creation of
Expand All @@ -202,43 +230,22 @@ private[table] class IndexedTableImpl(

}

/**
* Add the required indexing parameters when the SaveMode is Append.
* The user-provided parameters are respected.
* @param latestRevision the latest revision
* @param parameters the parameters required for indexing
*/
private def addRequiredParams(
latestRevision: Revision,
parameters: Map[String, String]): Map[String, String] = {
val columnsToIndex = latestRevision.columnTransformers.map(_.columnName).mkString(",")
val desiredCubeSize = latestRevision.desiredCubeSize.toString
(parameters.contains(COLUMNS_TO_INDEX), parameters.contains(CUBE_SIZE)) match {
case (true, true) => parameters
case (false, false) =>
parameters + (COLUMNS_TO_INDEX -> columnsToIndex, CUBE_SIZE -> desiredCubeSize)
case (true, false) => parameters + (CUBE_SIZE -> desiredCubeSize)
case (false, true) => parameters + (COLUMNS_TO_INDEX -> columnsToIndex)
}
}

override def save(
data: DataFrame,
parameters: Map[String, String],
append: Boolean): BaseRelation = {
val indexStatus =
if (exists && append) {
// If the table exists and we are appending new data
// If the indexedTable exists and we are appending new data
// 1. Load existing IndexStatus
val latestRevision = snapshot.loadLatestRevision
val updatedParameters = addRequiredParams(latestRevision, parameters)
val updatedProperties = verifyAndMergeProperties(parameters)
if (isStaging(latestRevision)) { // If the existing Revision is Staging
IndexStatus(revisionBuilder.createNewRevision(tableID, data.schema, updatedParameters))
IndexStatus(revisionBuilder.createNewRevision(tableID, data.schema, updatedProperties))
} else {
if (isNewRevision(QbeastOptions(updatedParameters), latestRevision)) {
if (isNewRevision(QbeastOptions(updatedProperties))) {
// If the new parameters generate a new revision, we need to create another one
val newPotentialRevision = revisionBuilder
.createNewRevision(tableID, data.schema, updatedParameters)
.createNewRevision(tableID, data.schema, updatedProperties)
val newRevisionCubeSize = newPotentialRevision.desiredCubeSize
// Merge new Revision Transformations with old Revision Transformations
val newRevisionTransformations =
Expand Down Expand Up @@ -288,16 +295,8 @@ private[table] class IndexedTableImpl(
snapshotCache = None
}

private def checkColumnsToMatchSchema(revision: Revision): Unit = {
val columnsToIndex = revision.columnTransformers.map(_.columnName)
if (!snapshot.loadLatestRevision.matchColumns(columnsToIndex)) {
throw AnalysisExceptionFactory.create(
s"Columns to index '$columnsToIndex' do not match existing index.")
}
}

/**
* Creates a QbeastBaseRelation for the given table.
* Creates a QbeastBaseRelation for the given indexedTable.
* @return the QbeastBaseRelation
*/
private def createQbeastBaseRelation(): BaseRelation = {
Expand Down Expand Up @@ -415,7 +414,7 @@ private[table] class IndexedTableImpl(
val currentIndexStatus = snapshot.loadIndexStatus(revisionID)

metadataManager.updateWithTransaction(tableID, schema, append = true) {
// There's no affected table changes on compaction, so we send an empty object
// There's no affected indexedTable changes on compaction, so we send an empty object
val tableChanges = BroadcastedTableChanges(None, currentIndexStatus, Map.empty)
val fileActions =
dataWriter.compact(tableID, schema, currentIndexStatus, tableChanges)
Expand Down
Loading

0 comments on commit cd03beb

Please sign in to comment.