Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #489: Extend QbeastSnapshot Interface to Expose Relevant Information #492

Merged
merged 4 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ trait QbeastSnapshot {
/**
* Returns the total number of data files in the snapshot.
*/
def allFilesCount: Long
def numOfFiles: Long

/**
* Provides the schema of the dataset for this snapshot.
Expand All @@ -55,6 +55,12 @@ trait QbeastSnapshot {
*/
def loadDescription: String

/**
* The current table configuration.
* @return
*/
def loadConfiguration: Map[String, String]

/**
* The current table properties of the snapshot.
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ case class SamplingListFilesStrategy(snapshot: QbeastSnapshot)
{
val context = SparkSession.active.sparkContext
val execId = context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
val count = snapshot.allFilesCount
val count = snapshot.numOfFiles
val filteredCount = count - files.length
val filteredPercent = (filteredCount.toDouble / count) * 100.0
val info = f"$filteredCount of $count ($filteredPercent%.2f%%)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ case class DeltaQbeastSnapshot(tableID: QTableID) extends QbeastSnapshot with De

override val basePath: Path = new Path(tableID.id)

override val snapshot: Snapshot =
DeltaLog.forTable(SparkSession.active, tableID.id).update()
private val deltaLog: DeltaLog = DeltaLog.forTable(SparkSession.active, tableID.id)
osopardo1 marked this conversation as resolved.
Show resolved Hide resolved

override val snapshot: Snapshot = deltaLog.update()

/**
* The current state of the snapshot.
Expand All @@ -58,7 +59,7 @@ case class DeltaQbeastSnapshot(tableID: QTableID) extends QbeastSnapshot with De

override lazy val schema: StructType = snapshot.metadata.schema

override lazy val allFilesCount: Long = snapshot.allFiles.count
override lazy val numOfFiles: Long = snapshot.numOfFiles

private val metadataMap: Map[String, String] = snapshot.metadata.configuration

Expand All @@ -78,6 +79,12 @@ case class DeltaQbeastSnapshot(tableID: QTableID) extends QbeastSnapshot with De
*/
override def loadDescription: String = snapshot.metadata.description

/**
* The current table configuration.
* @return
*/
override def loadConfiguration: Map[String, String] = metadataMap

/**
* Constructs revision dictionary
*
Expand Down Expand Up @@ -213,10 +220,10 @@ case class DeltaQbeastSnapshot(tableID: QTableID) extends QbeastSnapshot with De

/**
* Loads the dataset of qbeast blocks from index files
* @param indexFile
* @param indexFiles
* A dataset of index files
* @return
* the Datasetframe
* the DataFrame
*/

override def loadDataframeFromIndexFiles(indexFiles: Dataset[IndexFile]): DataFrame = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.spark.utils
package io.qbeast.spark.delta

import io.qbeast.core.model.StagingUtils
import io.qbeast.internal.commands.ConvertToQbeastCommand
Expand All @@ -25,7 +25,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.SparkSession
import org.scalatest.PrivateMethodTester

class ConvertToQbeastTest
class ConvertToQbeastDeltaTest
extends QbeastIntegrationTestSpec
with PrivateMethodTester
with StagingUtils {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.OptimisticTransaction
import org.apache.spark.sql.types.StructType

class MetadataWriterTest(
class MetadataWriterDeltaTest(
tableID: QTableID,
mode: WriteModeValue,
deltaLog: DeltaLog,
Expand All @@ -45,15 +45,15 @@ class MetadataWriterTest(

}

object MetadataWriterTest {
object MetadataWriterDeltaTest {

def apply(
tableID: QTableID,
mode: WriteModeValue,
deltaLog: DeltaLog,
options: QbeastOptions,
schema: StructType): MetadataWriterTest = {
new MetadataWriterTest(tableID, mode, deltaLog, options, schema)
schema: StructType): MetadataWriterDeltaTest = {
new MetadataWriterDeltaTest(tableID, mode, deltaLog, options, schema)
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.qbeast.spark.utils
package io.qbeast.spark.delta

import io.qbeast.context.QbeastContext
import io.qbeast.core.model.PreCommitHook
Expand All @@ -24,7 +24,7 @@ private class SimpleHook(kv: String) extends PreCommitHook {

}

class PreCommitHookIntegrationTest extends QbeastIntegrationTestSpec {
class PreCommitHookDeltaTest extends QbeastIntegrationTestSpec {

"PreCommitHook" should "run simple hooks and save their outputs to CommitInfo during writes" in
withQbeastContextSparkAndTmpDir { (spark, tmpDir) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.spark.utils
package io.qbeast.spark.delta

import io.qbeast.QbeastIntegrationTestSpec
import org.apache.spark.sql.AnalysisException
Expand All @@ -22,7 +22,7 @@ import org.apache.spark.sql.Row
/**
* Test for checking the correctness of the output schemas when Appending Data through INSERT INTO
*/
class QbeastSchemaTest extends QbeastIntegrationTestSpec {
class QbeastSchemaDeltaTest extends QbeastIntegrationTestSpec {

"Qbeast" should "detect when schemas does not match on INSERT INTO" in
withQbeastContextSparkAndTmpWarehouse((spark, _) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class DocumentationTests extends QbeastIntegrationTestSpec {
val qbeast_df = spark.read.format("qbeast").load(qbeast_table_path)

val qbeastSnapshot = getQbeastSnapshot(qbeast_table_path)
val totalNumberOfFiles = qbeastSnapshot.allFilesCount
val totalNumberOfFiles = qbeastSnapshot.numOfFiles

totalNumberOfFiles should be > 1L withClue
"Total number of files in pushdown notebook changes to " + totalNumberOfFiles
Expand Down