Skip to content

Commit

Permalink
Remove unnecessary code, add properties method to IndexedTable
Browse files Browse the repository at this point in the history
  • Loading branch information
osopardo1 authored and osopardo1 committed Dec 11, 2023
1 parent 7f015b0 commit 2369467
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 61 deletions.
6 changes: 0 additions & 6 deletions src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ object QbeastOptions {
val PATH = "path"
val STATS = "columnStats"

private final val keys = Seq(COLUMNS_TO_INDEX, CUBE_SIZE, PATH, STATS)

/**
* Gets the columns to index from the options
* @param options the options passed on the dataframe
Expand Down Expand Up @@ -99,10 +97,6 @@ object QbeastOptions {
}))
}

def loadQbeastOptions(options: Map[String, String]): Map[String, String] = {
options.filter(keys.contains)
}

def checkQbeastOptions(options: Map[String, String]): Unit = {
require(
options.contains("columnsToIndex") || options.contains("columnstoindex"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,9 @@ package io.qbeast.spark.internal.sources.catalog

import io.qbeast.context.QbeastContext.metadataManager
import io.qbeast.core.model.QTableID
import io.qbeast.spark.internal.QbeastOptions.{
COLUMNS_TO_INDEX,
CUBE_SIZE,
checkQbeastOptions,
loadQbeastOptions
}
import io.qbeast.spark.internal.QbeastOptions._
import io.qbeast.spark.internal.sources.v2.QbeastTableImpl
import io.qbeast.spark.table.{IndexedTable, IndexedTableFactory}
import io.qbeast.spark.table.{IndexedTableFactory}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException
Expand All @@ -24,6 +19,7 @@ import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{
AnalysisException,
AnalysisExceptionFactory,
DataFrame,
SaveMode,
Expand Down Expand Up @@ -137,51 +133,6 @@ object QbeastCatalogUtils {
}
}

/**
* Loads the Qbeast options if necessary
* @param tableType the table type
* @param indexedTable the indexed table
* @param properties the properties
* @return
*/
private def loadQbeastRequiredOptions(
tableType: CatalogTableType,
indexedTable: IndexedTable,
properties: Map[String, String]): Map[String, String] = {

val isExternal = tableType == CatalogTableType.EXTERNAL
val containsColumnsToIndex =
properties.contains("columnsToIndex") || properties.contains("columnstoindex")
if (isExternal && indexedTable.exists) {
// IF the table is EXTERNAL and EXISTS physically,
// Check and Add Qbeast Properties
(containsColumnsToIndex, indexedTable.isConverted) match {
case (false, true) =>
// If it does NOT contain Qbeast Properties AND is converted, load the latest revision
val qbeastSnapshot = metadataManager.loadSnapshot(indexedTable.tableID)
val latestRevision = qbeastSnapshot.loadLatestRevision
val columnsToIndex = latestRevision.columnTransformers.map(_.columnName).mkString(",")
Map(
COLUMNS_TO_INDEX -> columnsToIndex,
CUBE_SIZE -> latestRevision.desiredCubeSize.toString)
case (_, false) =>
// If it is NOT converted, throw error
throw AnalysisExceptionFactory.create(
"The table you are trying to create is not Qbeast Formatted. " +
"Please specify the columns to index and the cube size in the options. " +
"You can also use the ConvertToQbeastCommand before creating the table.")
case _ =>
// If it contains Qbeast Properties, check them
checkQbeastOptions(properties)
loadQbeastOptions(properties)
}
} else {
// If it's NOT external, check the properties
checkQbeastOptions(properties)
loadQbeastOptions(properties)
}
}

/**
* Creates a Table on the Catalog
* @param ident the Identifier of the table
Expand Down Expand Up @@ -230,8 +181,18 @@ object QbeastCatalogUtils {

// Process the parameters/options/configuration sent to the table
val indexedTable = tableFactory.getIndexedTable(QTableID(loc.toString))
val allProperties =
properties ++ loadQbeastRequiredOptions(tableType, indexedTable, properties)
val allProperties = {
if (tableType == CatalogTableType.EXTERNAL && !indexedTable.isConverted)
throw AnalysisExceptionFactory.create("This Table is not converted")
else {
try {
checkQbeastOptions(properties)
properties // No options added
} catch {
case _: AnalysisException => properties ++ indexedTable.properties
}
}
}

// Initialize the path option
val storage = DataSource
Expand Down
22 changes: 21 additions & 1 deletion src/main/scala/io/qbeast/spark/table/IndexedTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import io.qbeast.core.model._
import io.qbeast.spark.delta.{CubeDataLoader, StagingDataManager, StagingResolution}
import io.qbeast.spark.index.QbeastColumns
import io.qbeast.spark.internal.QbeastOptions
import io.qbeast.spark.internal.QbeastOptions.{COLUMNS_TO_INDEX, CUBE_SIZE}
import io.qbeast.spark.internal.QbeastOptions._
import io.qbeast.spark.internal.sources.QbeastBaseRelation
import org.apache.spark.qbeast.config.DEFAULT_NUMBER_OF_RETRIES
import org.apache.spark.sql.delta.actions.FileAction
Expand Down Expand Up @@ -43,6 +43,13 @@ trait IndexedTable {
*/
def tableID: QTableID

/**
* Returns the table properties.
*
* @return the table properties
*/
def properties: Map[String, String]

/**
* Saves given data in the table and updates the index. The specified columns are
* used to define the index when the table is created or overwritten. The append
Expand Down Expand Up @@ -154,6 +161,19 @@ private[table] class IndexedTableImpl(
case _: Exception => false
}

override def properties: Map[String, String] = {
if (exists && isConverted) {
val latestRevision = snapshot.loadLatestRevision
Map(
COLUMNS_TO_INDEX -> latestRevision.columnTransformers
.map(_.columnName)
.mkString(","),
CUBE_SIZE -> latestRevision.desiredCubeSize.toString)
} else {
Map.empty
}
}

private def isNewRevision(qbeastOptions: QbeastOptions, latestRevision: Revision): Boolean = {
// TODO feature: columnsToIndex may change between revisions
checkColumnsToMatchSchema(latestRevision)
Expand Down

0 comments on commit 2369467

Please sign in to comment.