Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #398: Fix small overhead added during the refactoirng #436

Merged
merged 5 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/io/qbeast/core/model/QbeastStats.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ case class QbeastStats(
numRecords: Long,
minValues: Map[String, String],
maxValues: Map[String, String],
nullCount: Map[String, Int])
nullCount: Map[String, String])
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import io.qbeast.core.model.IndexFile
import io.qbeast.core.model.PreCommitHook
import io.qbeast.core.model.PreCommitHook.PreCommitHookOutput
import io.qbeast.core.model.QTableID
import io.qbeast.core.model.QbeastFile
import io.qbeast.core.model.QbeastHookLoader
import io.qbeast.core.model.RevisionID
import io.qbeast.core.model.TableChanges
Expand Down Expand Up @@ -151,10 +152,9 @@ private[delta] case class DeltaMetadataWriter(
* @return
* A Map[String, String] representing the combined outputs of all hooks.
*/
private def runPreCommitHooks(actions: Seq[Action]): PreCommitHookOutput = {
val qbeastActions = actions.map(DeltaQbeastFileUtils.fromAction)
private def runPreCommitHooks(actions: Seq[QbeastFile]): PreCommitHookOutput = {
preCommitHooks.foldLeft(Map.empty[String, String]) { (acc, hook) =>
acc ++ hook.run(qbeastActions)
acc ++ hook.run(actions)
}
}

Expand All @@ -173,21 +173,24 @@ private[delta] case class DeltaMetadataWriter(
// Register metrics to use in the Commit Info
val statsTrackers = createStatsTrackers(txn)
registerStatsTrackers(statsTrackers)
// Execute write

val (changes, indexFiles, deleteFiles) = writer
// Execute write
val (tableChanges, indexFiles, deleteFiles) = writer
val addFiles = indexFiles.map(DeltaQbeastFileUtils.toAddFile(dataChange = true))
val removeFiles = deleteFiles.map(DeltaQbeastFileUtils.toRemoveFile(dataChange = false))

// Update Qbeast Metadata (replicated set, revision..)
var actions = updateMetadata(txn, changes, addFiles, removeFiles)
var actions = updateMetadata(txn, tableChanges, addFiles, removeFiles)
// Set transaction identifier if specified
for (txnVersion <- options.txnVersion; txnAppId <- options.txnAppId) {
actions +:= SetTransaction(txnAppId, txnVersion, Some(System.currentTimeMillis()))
}

// Run pre-commit hooks
val tags = runPreCommitHooks(actions)
val revision = tableChanges.updatedRevision
val dimensionCount = revision.transformations.length
val qbeastActions = actions.map(DeltaQbeastFileUtils.fromAction(dimensionCount))
val tags = runPreCommitHooks(qbeastActions)

// Commit the information to the DeltaLog
val op =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,9 @@ private[delta] object DeltaQbeastFileUtils {
* @param action
* the action instance
*/
def fromAction(action: Action): QbeastFile = {
def fromAction(dimensionCount: Int)(action: Action): QbeastFile = {
action match {
case addFile: AddFile => fromAddFile(1)(addFile)
case addFile: AddFile => fromAddFile(dimensionCount)(addFile)
case removeFile: RemoveFile => fromRemoveFile(removeFile)
case _ => null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ case class DeltaQbeastSnapshot(tableID: QTableID) extends QbeastSnapshot with De
*/
override def isInitial: Boolean = snapshot.version == -1

override val schema: StructType = snapshot.metadata.schema
override lazy val schema: StructType = snapshot.metadata.schema

override val allFilesCount: Long = snapshot.allFiles.count
override lazy val allFilesCount: Long = snapshot.allFiles.count

private val metadataMap: Map[String, String] = snapshot.metadata.configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.databind.node.JsonNodeType
import com.fasterxml.jackson.databind.DeserializationContext
import com.fasterxml.jackson.databind.JsonDeserializer
import com.fasterxml.jackson.databind.JsonMappingException
Expand All @@ -30,9 +31,12 @@ import io.qbeast.core.model.mapper
import io.qbeast.core.model.QbeastStats

private[delta] object QbeastStatsUtils {
private val module = new SimpleModule()
module.addSerializer(classOf[String], new ValueSerializer)
module.addDeserializer(classOf[String], new ValueDeserializer)

val module: SimpleModule = new SimpleModule()
.addSerializer(classOf[String], new ValueSerializer)
.addDeserializer(classOf[String], new ValueDeserializer)
.addDeserializer(classOf[Map[String, String]], new MapDeserializer)

mapper.registerModule(module)

def fromString(jsonString: String): Option[QbeastStats] = {
Expand All @@ -55,24 +59,49 @@ private[delta] object QbeastStatsUtils {

}

class ValueSerializer extends JsonSerializer[String] {
class ValueSerializer extends JsonSerializer[Any] {

override def serialize(
value: String,
value: Any,
gen: JsonGenerator,
serializers: SerializerProvider): Unit = {
try {
val intValue = value.toInt
gen.writeNumber(intValue)
} catch {
case _: NumberFormatException =>
value match {
case m: Map[_, _] =>
gen.writeStartObject()
m.foreach { case (key, v) =>
gen.writeFieldName(key.toString)
v match {
case nestedMap: Map[_, _] =>
serialize(nestedMap, gen, serializers)
case s: String =>
try {
val jsonNode = mapper.readTree(s)
gen.writeTree(jsonNode)
} catch {
case _: Exception =>
gen.writeString(s)
}
case i: Int => gen.writeNumber(i)
case l: Long => gen.writeNumber(l)
case d: Double => gen.writeNumber(d)
case other => gen.writeString(other.toString)
}
}
gen.writeEndObject()

case s: String =>
try {
val doubleValue = value.toDouble
gen.writeNumber(doubleValue)
val jsonNode = mapper.readTree(s)
gen.writeTree(jsonNode)
} catch {
case _: NumberFormatException =>
gen.writeString(value)
case _: Exception =>
gen.writeString(s)
}

case i: Int => gen.writeNumber(i)
case l: Long => gen.writeNumber(l)
case d: Double => gen.writeNumber(d)
case other => gen.writeString(other.toString)
}
}

Expand All @@ -92,3 +121,26 @@ class ValueDeserializer extends JsonDeserializer[String] {
}

}

class MapDeserializer extends JsonDeserializer[Map[String, String]] {

override def deserialize(p: JsonParser, ctxt: DeserializationContext): Map[String, String] = {
val node = p.getCodec.readTree[JsonNode](p)
val mapBuilder = scala.collection.mutable.Map[String, String]()

if (node.isObject) {
node.fields().forEachRemaining { entry =>
val key = entry.getKey
val valueNode = entry.getValue
if (valueNode.getNodeType == JsonNodeType.OBJECT) {
mapBuilder(key) = valueNode.toString
} else {
mapBuilder(key) = valueNode.asText()
}
}
}

mapBuilder.toMap
}

}
102 changes: 102 additions & 0 deletions delta/src/test/scala/io/qbeast/spark/delta/DeltaQbeastStatsTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.spark.delta

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import io.qbeast.core.model.mapper
import io.qbeast.core.model.QbeastStats
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class DeltaQbeastStatsTest extends AnyFlatSpec with Matchers {

mapper.registerModule(QbeastStatsUtils.module)

def areJsonEqual(json1: String, json2: String): Boolean = {
val basicMapper = new ObjectMapper()

try {
val node1: JsonNode = basicMapper.readTree(json1)
val node2: JsonNode = basicMapper.readTree(json2)

node1.equals(node2)
} catch {
case e: Exception =>
println(s"Error parsing JSON: ${e.getMessage}")
false
}
}

"ValueSerializer" should "serialize a numeric string to a number" in {
val json = mapper.writeValueAsString("123") // This uses ValueSerializer
json shouldEqual "123" // It should output a number in JSON
}

it should "serialize a non-numeric string as is" in {
val json = mapper.writeValueAsString("hello") // This uses ValueSerializer
json shouldEqual "\"hello\""
}

"ValueDeserializer" should "deserialize a number string to a string" in {
val value = mapper.readValue("123", classOf[String]) // This uses ValueDeserializer
value shouldEqual "123"
}

it should "deserialize a textual value as is" in {
val value = mapper.readValue("\"hello\"", classOf[String]) // This uses ValueDeserializer
value shouldEqual "hello"
}

"MapDeserializer" should "deserialize a JSON object to a Map[String, String]" in {
val json = """{"key1": "value1", "key2": {"innerKey": "innerValue"}}"""
val result = mapper.readValue(json, classOf[Map[String, String]]) // This uses MapDeserializer

result should contain("key1" -> "value1")
result should contain("key2" -> """{"innerKey":"innerValue"}""")
}

"QbeastStatsUtils" should "serialize and deserialize correctly" in {
val jsonString =
"""{"numRecords":52041,"minValues":{"key1": "value1", "key2": {"innerKey": "innerValue"}},
|"maxValues":{"key3": "value3", "key4": "value4"},"nullCount":{"key5": 0, "key6": 2}}""".stripMargin

// Create the expected QbeastStats object
val expectedStats = QbeastStats(
numRecords = 52041,
minValues = Map("key2" -> """{"innerKey":"innerValue"}""", "key1" -> "value1"),
maxValues = Map("key3" -> "value3", "key4" -> "value4"),
nullCount = Map("key5" -> "0", "key6" -> "2"))

// Deserialize the JSON string
val deserializedStats = QbeastStatsUtils.fromString(jsonString)

// Verify that deserialization was successful and matches expected values
deserializedStats shouldBe defined // Ensure deserialization was successful

deserializedStats.foreach { ds =>
ds.numRecords shouldEqual expectedStats.numRecords
ds.minValues should contain theSameElementsAs expectedStats.minValues
ds.maxValues should contain theSameElementsAs expectedStats.maxValues
ds.nullCount should contain theSameElementsAs expectedStats.nullCount
}

// Serialize back to JSON string and verify it matches the original
val serializedJsonString = QbeastStatsUtils.toString(deserializedStats.get)
areJsonEqual(serializedJsonString, jsonString) shouldBe true
}

}
5 changes: 2 additions & 3 deletions src/main/scala/io/qbeast/sources/QbeastBaseRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,14 @@ object QbeastBaseRelation {
val spark = SparkSession.active
val tableID = table.tableID
val snapshot = QbeastContext.metadataManager.loadSnapshot(tableID)
val schema = QbeastContext.metadataManager.loadCurrentSchema(tableID)
if (snapshot.isInitial) {
// If the Table is initial, read empty relation
// This could happen if we CREATE/REPLACE TABLE without inserting data
// In this case, we use the options variable
new HadoopFsRelation(
EmptyFileIndex,
partitionSchema = StructType(Seq.empty[StructField]),
dataSchema = schema,
dataSchema = snapshot.schema,
bucketSpec = None,
new ParquetFileFormat(),
options)(spark) with InsertableRelation {
Expand All @@ -87,7 +86,7 @@ object QbeastBaseRelation {
new HadoopFsRelation(
fileIndex,
partitionSchema = StructType(Seq.empty[StructField]),
dataSchema = schema,
dataSchema = snapshot.schema,
bucketSpec = bucketSpec,
file,
parameters)(spark) with InsertableRelation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.qbeast.spark.delta

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import io.delta.tables._
import io.qbeast.QbeastIntegrationTestSpec
import org.apache.spark.sql.delta.DeltaLog
Expand All @@ -31,6 +33,21 @@ class QbeastDeltaIntegrationTest extends QbeastIntegrationTestSpec {
Seq(("A", 1), ("B", 2), ("C", 3)).toDF("a", "b")
}

def areJsonEqual(json1: String, json2: String): Boolean = {
val basicMapper = new ObjectMapper()

try {
val node1: JsonNode = basicMapper.readTree(json1)
val node2: JsonNode = basicMapper.readTree(json2)

node1.equals(node2)
} catch {
case e: Exception =>
println(s"Error parsing JSON: ${e.getMessage}")
false
}
}

"Qbeast" should "output correctly Operation Metrics in Delta History" in
withQbeastContextSparkAndTmpDir((spark, tmpDir) => {

Expand Down Expand Up @@ -63,10 +80,13 @@ class QbeastDeltaIntegrationTest extends QbeastIntegrationTestSpec {
val stats =
DeltaLog.forTable(spark, tmpDir).unsafeVolatileSnapshot.allFiles.collect().map(_.stats)
stats.length shouldBe >(0)
stats.head shouldBe "{\"numRecords\":3,\"minValues\":{\"a\":\"A\",\"b\":1}," +
"\"maxValues\":{\"a\":\"C\",\"b\":3}," +
"\"nullCount\":{\"a\":0,\"b\":0}}"

val expectedStats =
"""{"numRecords":3,"minValues":{"a":"A","b":1},
|"maxValues":{"a":"C","b":3},
|"nullCount":{"a":0,"b":0}}""".stripMargin

areJsonEqual(stats.head, expectedStats) shouldBe true
})

it should "not write stats when specified" in withExtendedSparkAndTmpDir(
Expand Down
Loading