Skip to content

Commit

Permalink
Issue Qbeast-io#476: Add qbeast WriteMode
Browse files Browse the repository at this point in the history
  • Loading branch information
JosepSampe authored Nov 21, 2024
1 parent 6767eea commit c5c0397
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.qbeast.core.model

import io.qbeast.core.model.WriteMode.WriteModeValue
import io.qbeast.spark.internal.QbeastOptions
import io.qbeast.IISeq
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -58,7 +59,7 @@ trait MetadataManager {
tableID: QTableID,
schema: StructType,
options: QbeastOptions,
append: Boolean)(
writeMode: WriteModeValue)(
writer: String => (TableChanges, IISeq[IndexFile], IISeq[DeleteFile])): Unit

/**
Expand Down
27 changes: 27 additions & 0 deletions core/src/main/scala/io/qbeast/core/model/WriteMode.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.core.model

/**
* Names of possible write modes
*/

object WriteMode {
type WriteModeValue = String
final val Append: WriteModeValue = "APPEND"
final val Overwrite: WriteModeValue = "OVERWRITE"
final val Optimize: WriteModeValue = "OPTIMIZE"
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
package io.qbeast.spark.delta

import io.qbeast.core.model._
import io.qbeast.core.model.WriteMode.WriteModeValue
import io.qbeast.spark.internal.QbeastOptions
import io.qbeast.IISeq
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession

/**
Expand All @@ -32,14 +32,12 @@ object DeltaMetadataManager extends MetadataManager {
tableID: QTableID,
schema: StructType,
options: QbeastOptions,
append: Boolean)(
writeMode: WriteModeValue)(
writer: String => (TableChanges, IISeq[IndexFile], IISeq[DeleteFile])): Unit = {

val deltaLog = loadDeltaLog(tableID)
val mode = if (append) SaveMode.Append else SaveMode.Overwrite

val metadataWriter =
DeltaMetadataWriter(tableID, mode, deltaLog, options, schema)
DeltaMetadataWriter(tableID, writeMode, deltaLog, options, schema)

metadataWriter.writeWithTransaction(writer)
}
Expand All @@ -48,7 +46,7 @@ object DeltaMetadataManager extends MetadataManager {
update: => Configuration): Unit = {
val deltaLog = loadDeltaLog(tableID)
val metadataWriter =
DeltaMetadataWriter(tableID, mode = SaveMode.Append, deltaLog, QbeastOptions.empty, schema)
DeltaMetadataWriter(tableID, WriteMode.Append, deltaLog, QbeastOptions.empty, schema)

metadataWriter.updateMetadataWithTransaction(update)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import io.qbeast.core.model.QbeastFile
import io.qbeast.core.model.QbeastHookLoader
import io.qbeast.core.model.RevisionID
import io.qbeast.core.model.TableChanges
import io.qbeast.core.model.WriteMode
import io.qbeast.core.model.WriteMode.WriteModeValue
import io.qbeast.spark.internal.QbeastOptions
import io.qbeast.spark.utils.QbeastExceptionMessages.partitionedTableExceptionMsg
import io.qbeast.spark.utils.TagColumns
Expand Down Expand Up @@ -63,7 +65,7 @@ import scala.collection.mutable.ListBuffer
*/
private[delta] case class DeltaMetadataWriter(
tableID: QTableID,
mode: SaveMode,
writeMode: WriteModeValue,
deltaLog: DeltaLog,
qbeastOptions: QbeastOptions,
schema: StructType)
Expand All @@ -76,7 +78,12 @@ private[delta] case class DeltaMetadataWriter(
new DeltaOptions(optionsMap, SparkSession.active.sessionState.conf)
}

private def isOverwriteOperation: Boolean = mode == SaveMode.Overwrite
private def isOverwriteOperation: Boolean = writeMode == WriteMode.Overwrite

private def isOptimizeOperation: Boolean = writeMode == WriteMode.Optimize

private def saveMode =
if (!isOverwriteOperation || isOptimizeOperation) SaveMode.Append else SaveMode.Overwrite

override protected val canMergeSchema: Boolean = deltaOptions.canMergeSchema

Expand Down Expand Up @@ -198,7 +205,11 @@ private[delta] case class DeltaMetadataWriter(

// Commit the information to the DeltaLog
val op =
DeltaOperations.Write(mode, None, deltaOptions.replaceWhere, deltaOptions.userMetadata)
DeltaOperations.Write(
saveMode,
None,
deltaOptions.replaceWhere,
deltaOptions.userMetadata)
txn.commit(actions = actions, op = op, tags = tags)
}
}
Expand Down Expand Up @@ -274,11 +285,11 @@ private[delta] case class DeltaMetadataWriter(

if (txn.readVersion > -1) {
// This table already exists, check if the insert is valid.
if (mode == SaveMode.ErrorIfExists) {
if (saveMode == SaveMode.ErrorIfExists) {
throw AnalysisExceptionFactory.create(s"Path '${deltaLog.dataPath}' already exists.'")
} else if (mode == SaveMode.Ignore) {
} else if (saveMode == SaveMode.Ignore) {
return Nil
} else if (mode == SaveMode.Overwrite) {
} else if (saveMode == SaveMode.Overwrite) {
DeltaLog.assertRemovable(txn.snapshot)
}
}
Expand All @@ -304,7 +315,7 @@ private[delta] case class DeltaMetadataWriter(
fs.mkdirs(deltaLog.logPath)
}

val deletedFiles = mode match {
val deletedFiles = saveMode match {
case SaveMode.Overwrite =>
txn.filterFiles().map(_.remove)
case _ => removeFiles
Expand Down
60 changes: 32 additions & 28 deletions src/main/scala/io/qbeast/table/IndexedTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,8 @@ private[table] class IndexedTableImpl(
case StagingResolution(dataToWrite, removeFiles, false) =>
val schema = dataToWrite.schema
val deleteFiles = removeFiles.toIndexedSeq
metadataManager.updateWithTransaction(tableID, schema, options, append) {
val writeMode = if (append) WriteMode.Append else WriteMode.Overwrite
metadataManager.updateWithTransaction(tableID, schema, options, writeMode) {
transactionStartTime: String =>
val (qbeastData, tableChanges) = indexManager.index(dataToWrite, indexStatus)
val addFiles =
Expand Down Expand Up @@ -599,7 +600,7 @@ private[table] class IndexedTableImpl(
tableID,
schema,
optimizationOptions(options),
append = true) { transactionStartTime: String =>
WriteMode.Optimize) { transactionStartTime: String =>
// Remove the Unindexed Files from the Log
val deleteFiles: IISeq[DeleteFile] = files
.map { indexFile =>
Expand Down Expand Up @@ -642,32 +643,35 @@ private[table] class IndexedTableImpl(
val indexStatus = snapshot.loadIndexStatus(revision.revisionID)
// 3. In the same transaction
metadataManager
.updateWithTransaction(tableID, schema, optimizationOptions(options), append = true) {
transactionStartTime: String =>
import indexFiles.sparkSession.implicits._
val deleteFiles: IISeq[DeleteFile] = indexFiles
.map { indexFile =>
DeleteFile(
path = indexFile.path,
size = indexFile.size,
dataChange = false,
deletionTimestamp = currentTimeMillis())
}
.collect()
.toIndexedSeq
// 1. Load the data from the Index Files
val data = snapshot.loadDataframeFromIndexFiles(indexFiles)
// 2. Optimize the data with IndexManager
val (dataExtended, tableChanges) =
DoublePassOTreeDataAnalyzer.analyzeOptimize(data, indexStatus)
// 3. Write the data with DataWriter
val addFiles = dataWriter
.write(tableID, schema, dataExtended, tableChanges, transactionStartTime)
.collect { case indexFile: IndexFile =>
indexFile.copy(dataChange = false)
}
dataExtended.unpersist()
(tableChanges, addFiles, deleteFiles)
.updateWithTransaction(
tableID,
schema,
optimizationOptions(options),
WriteMode.Optimize) { transactionStartTime: String =>
import indexFiles.sparkSession.implicits._
val deleteFiles: IISeq[DeleteFile] = indexFiles
.map { indexFile =>
DeleteFile(
path = indexFile.path,
size = indexFile.size,
dataChange = false,
deletionTimestamp = currentTimeMillis())
}
.collect()
.toIndexedSeq
// 1. Load the data from the Index Files
val data = snapshot.loadDataframeFromIndexFiles(indexFiles)
// 2. Optimize the data with IndexManager
val (dataExtended, tableChanges) =
DoublePassOTreeDataAnalyzer.analyzeOptimize(data, indexStatus)
// 3. Write the data with DataWriter
val addFiles = dataWriter
.write(tableID, schema, dataExtended, tableChanges, transactionStartTime)
.collect { case indexFile: IndexFile =>
indexFile.copy(dataChange = false)
}
dataExtended.unpersist()
(tableChanges, addFiles, deleteFiles)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/test/scala/io/qbeast/spark/delta/MetadataWriterTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ package io.qbeast.spark.delta

import io.qbeast.core.model.QTableID
import io.qbeast.core.model.TableChanges
import io.qbeast.core.model.WriteMode.WriteModeValue
import io.qbeast.spark.internal.QbeastOptions
import org.apache.spark.sql.delta.actions.Action
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.actions.RemoveFile
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.OptimisticTransaction
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.SaveMode

class MetadataWriterTest(
tableID: QTableID,
mode: SaveMode,
mode: WriteModeValue,
deltaLog: DeltaLog,
options: QbeastOptions,
schema: StructType)
Expand All @@ -49,7 +49,7 @@ object MetadataWriterTest {

def apply(
tableID: QTableID,
mode: SaveMode,
mode: WriteModeValue,
deltaLog: DeltaLog,
options: QbeastOptions,
schema: StructType): MetadataWriterTest = {
Expand Down
15 changes: 8 additions & 7 deletions src/test/scala/io/qbeast/spark/delta/keeper/ProtocolMock.scala
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ class WritingProcess(context: ProtoTestContext)(implicit keeper: Keeper) extends
val winfo = keeper.beginWrite(tableID, rev.revisionID)

val deltaLog = DeltaMetadataManager.loadDeltaLog(tableID)
val mode = SaveMode.Append
val metadataWriter = MetadataWriterTest(tableID, mode, deltaLog, QbeastOptions.empty, schema)

val metadataWriter =
MetadataWriterTest(tableID, WriteMode.Append, deltaLog, QbeastOptions.empty, schema)

var tries = 2
try {
Expand All @@ -160,7 +161,7 @@ class WritingProcess(context: ProtoTestContext)(implicit keeper: Keeper) extends
metadataWriter.updateMetadata(txn, changes, addFiles, removeFiles, Map.empty)
simulatePause()
try {
txn.commit(finalActions, DeltaOperations.Write(mode))
txn.commit(finalActions, DeltaOperations.Write(SaveMode.Append))
tries = 0
succeeded = Some(true)
} catch {
Expand Down Expand Up @@ -196,8 +197,8 @@ class OptimizingProcessGood(context: ProtoTestContext)(implicit keeper: Keeper)

val deltaLog = DeltaMetadataManager.loadDeltaLog(tableID)
val deltaSnapshot = deltaLog.update()
val mode = SaveMode.Append
val metadataWriter = MetadataWriterTest(tableID, mode, deltaLog, QbeastOptions.empty, schema)
val metadataWriter =
MetadataWriterTest(tableID, WriteMode.Append, deltaLog, QbeastOptions.empty, schema)
val cubesToOptimize = bo.cubesToOptimize.map(rev.createCubeId)

try {
Expand Down Expand Up @@ -229,8 +230,8 @@ class OptimizingProcessBad(context: ProtoTestContext, args: Seq[String])(implici

val deltaLog = DeltaMetadataManager.loadDeltaLog(tableID)
val deltaSnapshot = deltaLog.update()
val mode = SaveMode.Append
val metadataWriter = MetadataWriterTest(tableID, mode, deltaLog, QbeastOptions.empty, schema)
val metadataWriter =
MetadataWriterTest(tableID, WriteMode.Append, deltaLog, QbeastOptions.empty, schema)

val cubesToOptimize = args.toSet

Expand Down

0 comments on commit c5c0397

Please sign in to comment.