diff --git a/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala b/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala index aff3d2ab4..60218f8e7 100644 --- a/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala +++ b/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala @@ -25,8 +25,6 @@ object QbeastOptions { val PATH = "path" val STATS = "columnStats" - private final val keys = Seq(COLUMNS_TO_INDEX, CUBE_SIZE, PATH, STATS) - /** * Gets the columns to index from the options * @param options the options passed on the dataframe @@ -99,10 +97,6 @@ object QbeastOptions { })) } - def loadQbeastOptions(options: Map[String, String]): Map[String, String] = { - options.filter(keys.contains) - } - def checkQbeastOptions(options: Map[String, String]): Unit = { require( options.contains("columnsToIndex") || options.contains("columnstoindex"), diff --git a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala index 3cfd40a6e..5f25a216c 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala @@ -5,14 +5,9 @@ package io.qbeast.spark.internal.sources.catalog import io.qbeast.context.QbeastContext.metadataManager import io.qbeast.core.model.QTableID -import io.qbeast.spark.internal.QbeastOptions.{ - COLUMNS_TO_INDEX, - CUBE_SIZE, - checkQbeastOptions, - loadQbeastOptions -} +import io.qbeast.spark.internal.QbeastOptions._ import io.qbeast.spark.internal.sources.v2.QbeastTableImpl -import io.qbeast.spark.table.{IndexedTable, IndexedTableFactory} +import io.qbeast.spark.table.{IndexedTableFactory} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException @@ -24,6 +19,7 @@ import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{ + AnalysisException, AnalysisExceptionFactory, DataFrame, SaveMode, @@ -137,51 +133,6 @@ object QbeastCatalogUtils { } } - /** - * Loads the Qbeast options if necessary - * @param tableType the table type - * @param indexedTable the indexed table - * @param properties the properties - * @return - */ - private def loadQbeastRequiredOptions( - tableType: CatalogTableType, - indexedTable: IndexedTable, - properties: Map[String, String]): Map[String, String] = { - - val isExternal = tableType == CatalogTableType.EXTERNAL - val containsColumnsToIndex = - properties.contains("columnsToIndex") || properties.contains("columnstoindex") - if (isExternal && indexedTable.exists) { - // IF the table is EXTERNAL and EXISTS physically, - // Check and Add Qbeast Properties - (containsColumnsToIndex, indexedTable.isConverted) match { - case (false, true) => - // If it does NOT contain Qbeast Properties AND is converted, load the latest revision - val qbeastSnapshot = metadataManager.loadSnapshot(indexedTable.tableID) - val latestRevision = qbeastSnapshot.loadLatestRevision - val columnsToIndex = latestRevision.columnTransformers.map(_.columnName).mkString(",") - Map( - COLUMNS_TO_INDEX -> columnsToIndex, - CUBE_SIZE -> latestRevision.desiredCubeSize.toString) - case (_, false) => - // If it is NOT converted, throw error - throw AnalysisExceptionFactory.create( - "The table you are trying to create is not Qbeast Formatted. " + - "Please specify the columns to index and the cube size in the options. " + - "You can also use the ConvertToQbeastCommand before creating the table.") - case _ => - // If it contains Qbeast Properties, check them - checkQbeastOptions(properties) - loadQbeastOptions(properties) - } - } else { - // If it's NOT external, check the properties - checkQbeastOptions(properties) - loadQbeastOptions(properties) - } - } - /** * Creates a Table on the Catalog * @param ident the Identifier of the table @@ -230,8 +181,18 @@ object QbeastCatalogUtils { // Process the parameters/options/configuration sent to the table val indexedTable = tableFactory.getIndexedTable(QTableID(loc.toString)) - val allProperties = - properties ++ loadQbeastRequiredOptions(tableType, indexedTable, properties) + val allProperties = { + if (tableType == CatalogTableType.EXTERNAL && !indexedTable.isConverted) + throw AnalysisExceptionFactory.create("This Table is not converted") + else { + try { + checkQbeastOptions(properties) + properties // No options added + } catch { + case _: AnalysisException => properties ++ indexedTable.properties + } + } + } // Initialize the path option val storage = DataSource diff --git a/src/main/scala/io/qbeast/spark/table/IndexedTable.scala b/src/main/scala/io/qbeast/spark/table/IndexedTable.scala index 996274184..3bff0c313 100644 --- a/src/main/scala/io/qbeast/spark/table/IndexedTable.scala +++ b/src/main/scala/io/qbeast/spark/table/IndexedTable.scala @@ -8,7 +8,7 @@ import io.qbeast.core.model._ import io.qbeast.spark.delta.{CubeDataLoader, StagingDataManager, StagingResolution} import io.qbeast.spark.index.QbeastColumns import io.qbeast.spark.internal.QbeastOptions -import io.qbeast.spark.internal.QbeastOptions.{COLUMNS_TO_INDEX, CUBE_SIZE} +import io.qbeast.spark.internal.QbeastOptions._ import io.qbeast.spark.internal.sources.QbeastBaseRelation import org.apache.spark.qbeast.config.DEFAULT_NUMBER_OF_RETRIES import org.apache.spark.sql.delta.actions.FileAction @@ -43,6 +43,13 @@ trait IndexedTable { */ def tableID: QTableID + /** + * Returns the table properties. + * + * @return the table properties + */ + def properties: Map[String, String] + /** * Saves given data in the table and updates the index. The specified columns are * used to define the index when the table is created or overwritten. The append @@ -154,6 +161,19 @@ private[table] class IndexedTableImpl( case _: Exception => false } + override def properties: Map[String, String] = { + if (exists && isConverted) { + val latestRevision = snapshot.loadLatestRevision + Map( + COLUMNS_TO_INDEX -> latestRevision.columnTransformers + .map(_.columnName) + .mkString(","), + CUBE_SIZE -> latestRevision.desiredCubeSize.toString) + } else { + Map.empty + } + } + private def isNewRevision(qbeastOptions: QbeastOptions, latestRevision: Revision): Boolean = { // TODO feature: columnsToIndex may change between revisions checkColumnsToMatchSchema(latestRevision)