Skip to content

Commit

Permalink
Issue Qbeast-io#224: Handling single-value appends (Qbeast-io#462)
Browse files Browse the repository at this point in the history
* Deprecate NullToZero and IdentityToZero Transformations, add IdentityTransformations, add tests
  • Loading branch information
Jiaweihu08 authored Nov 15, 2024
1 parent e63f59c commit f9313b1
Show file tree
Hide file tree
Showing 10 changed files with 754 additions and 364 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ trait CDFQuantilesTransformer extends Transformer {
/**
* The name of the column transformer to retrieve the stats from
*/
val columnTransformerName: String = s"${columnName}_quantiles"
def columnTransformerName: String = s"${columnName}_quantiles"

/**
* Returns the stats
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright 2021 Qbeast Analytics, S.L.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.qbeast.core.transform

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.core.TreeNode
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.fasterxml.jackson.databind.annotation.JsonSerialize
import com.fasterxml.jackson.databind.deser.std.StdDeserializer
import com.fasterxml.jackson.databind.jsontype.TypeSerializer
import com.fasterxml.jackson.databind.node.DoubleNode
import com.fasterxml.jackson.databind.node.IntNode
import com.fasterxml.jackson.databind.node.NullNode
import com.fasterxml.jackson.databind.node.NumericNode
import com.fasterxml.jackson.databind.node.TextNode
import com.fasterxml.jackson.databind.ser.std.StdSerializer
import com.fasterxml.jackson.databind.DeserializationContext
import com.fasterxml.jackson.databind.SerializerProvider
import io.qbeast.core.model._

@JsonSerialize(using = classOf[IdentityTransformationSerializer])
@JsonDeserialize(using = classOf[IdentityTransformationDeserializer])
case class IdentityTransformation(identityValue: Any, orderedDataType: OrderedDataType)
extends Transformation {

import orderedDataType.ordering._

override def transform(value: Any): Double = 0.0

override def isSupersededBy(newTransformation: Transformation): Boolean =
newTransformation match {
case IdentityTransformation(newIdValue, newType) if newType == orderedDataType =>
if (newIdValue == null) false
else if (identityValue == null) true
else newIdValue != identityValue
case _: LinearTransformation => true
case _ => false
}

override def merge(other: Transformation): Transformation = other match {
case linear @ LinearTransformation(min, max, _, newType) if newType == orderedDataType =>
if (identityValue == null || (lteq(min, identityValue) && gteq(max, identityValue))) linear
else {
val minValue = min.min(identityValue)
val maxValue = max.max(identityValue)
LinearTransformation(minValue, maxValue, orderedDataType)
}
case identity @ IdentityTransformation(newIdValue, newType) if newType == orderedDataType =>
if (newIdValue == null) this
else if (identityValue == null) identity
else if (identityValue == newIdValue) this
else {
val minValue = identityValue.min(newIdValue)
val maxValue = identityValue.max(newIdValue)
LinearTransformation(minValue, maxValue, orderedDataType)
}
}

}

class IdentityTransformationSerializer
extends StdSerializer[IdentityTransformation](classOf[IdentityTransformation]) {

override def serializeWithType(
value: IdentityTransformation,
gen: JsonGenerator,
serializers: SerializerProvider,
typeSer: TypeSerializer): Unit = {
gen.writeStartObject()
typeSer.getPropertyName
gen.writeStringField(typeSer.getPropertyName, typeSer.getTypeIdResolver.idFromValue(value))
value.identityValue match {
case v: Double => gen.writeNumberField("identityValue", v)
case v: Long => gen.writeNumberField("identityValue", v)
case v: Int => gen.writeNumberField("identityValue", v)
case v: Float => gen.writeNumberField("identityValue", v)
case null => gen.writeNullField("identityValue")
}
gen.writeObjectField("orderedDataType", value.orderedDataType)
gen.writeEndObject()
}

override def serialize(
value: IdentityTransformation,
gen: JsonGenerator,
serializers: SerializerProvider): Unit = {
gen.writeStartObject()
value.identityValue match {
case v: Double => gen.writeNumberField("identityValue", v)
case v: Long => gen.writeNumberField("identityValue", v)
case v: Int => gen.writeNumberField("identityValue", v)
case v: Float => gen.writeNumberField("identityValue", v)
}
gen.writeObjectField("orderedDataType", value.orderedDataType)
gen.writeEndObject()
}

}

class IdentityTransformationDeserializer
extends StdDeserializer[IdentityTransformation](classOf[IdentityTransformation]) {

private def getTypedValue(odt: OrderedDataType, tree: TreeNode): Any = {
(odt, tree) match {
case (IntegerDataType, int: IntNode) => int.asInt
case (DoubleDataType, double: DoubleNode) => double.asDouble
case (LongDataType, long: NumericNode) => long.asLong
case (FloatDataType, float: DoubleNode) => float.floatValue
case (DecimalDataType, decimal: DoubleNode) => decimal.asDouble
case (TimestampDataType, timestamp: NumericNode) => timestamp.asLong
case (DateDataType, date: NumericNode) => date.asLong
case (_, _: NullNode) => null
case (_, null) => null
case (a, b) =>
throw new IllegalArgumentException(s"Invalid data type ($a,$b) ${b.getClass} ")
}

}

override def deserialize(
p: JsonParser,
ctxt: DeserializationContext): IdentityTransformation = {
val tree: TreeNode = p.getCodec.readTree(p)
val odt = tree.get("orderedDataType") match {
case tn: TextNode => OrderedDataType(tn.asText())
}
val identityValue = getTypedValue(odt, tree.get("identityValue"))
IdentityTransformation(identityValue, odt)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,37 +91,18 @@ case class LinearTransformation(

/**
* Merges two transformations. The domain of the resulting transformation is the union of this
* and the other transformation. The range of the resulting transformation is the intersection
* of this and the other transformation, which can be a LinearTransformation or
* IdentityTransformation
* @param other
* @return
* a new Transformation that contains both this and other.
* and the other transformation.
*/
override def merge(other: Transformation): Transformation = {
other match {
case LinearTransformation(otherMin, otherMax, otherNullValue, otherOrdering)
case LinearTransformation(otherMin, otherMax, _, otherOrdering)
if orderedDataType == otherOrdering =>
LinearTransformation(
min(minNumber, otherMin),
max(maxNumber, otherMax),
otherNullValue,
orderedDataType)
.asInstanceOf[Transformation]
LinearTransformation(min(minNumber, otherMin), max(maxNumber, otherMax), orderedDataType)
case IdentityTransformation(newVal, otherType) if orderedDataType == otherType =>
LinearTransformation(min(minNumber, newVal), max(maxNumber, newVal), orderedDataType)
case IdentityToZeroTransformation(newVal) =>
val otherNullValue =
LinearTransformationUtils.generateRandomNumber(
min(minNumber, newVal),
max(maxNumber, newVal),
Option(42.toLong))
val orderedDataType = this.orderedDataType
LinearTransformation(
min(minNumber, newVal),
max(maxNumber, newVal),
otherNullValue,
orderedDataType)
.asInstanceOf[Transformation]

LinearTransformation(min(minNumber, newVal), max(maxNumber, newVal), orderedDataType)
case _ => this
}
}

Expand All @@ -137,13 +118,59 @@ case class LinearTransformation(
case LinearTransformation(newMin, newMax, _, otherOrdering)
if orderedDataType == otherOrdering =>
gt(minNumber, newMin) || lt(maxNumber, newMax)
case IdentityToZeroTransformation(newVal) =>
case IdentityTransformation(newVal, otherType)
if otherType == orderedDataType && newVal != null =>
gt(minNumber, newVal) || lt(maxNumber, newVal)
case IdentityToZeroTransformation(newVal) => gt(minNumber, newVal) || lt(maxNumber, newVal)
case _ => false
}

}

object LinearTransformation {

/**
* Creates a LinearTransformation that has random value for the nulls within the [minNumber,
* maxNumber] range
* @param minNumber
* the minimum value of the transformation
* @param maxNumber
* the maximum value of the transformation
* @param orderedDataType
* the ordered data type of the transformation
* @param seed
* the seed to generate the random null value
* @return
*/
def apply(
minNumber: Any,
maxNumber: Any,
orderedDataType: OrderedDataType,
seed: Option[Long] = None): LinearTransformation = {
val randomNull = generateRandomNumber(minNumber, maxNumber, seed)
LinearTransformation(minNumber, maxNumber, randomNull, orderedDataType)
}

/**
* Creates a LinearTransformationUtils object that contains useful functions that can be used
* outside the LinearTransformation class.
*/
private[transform] def generateRandomNumber(min: Any, max: Any, seed: Option[Long]): Any = {
val r = if (seed.isDefined) new Random(seed.get) else new Random()
val random = r.nextDouble()
(min, max) match {
case (min: Double, max: Double) => min + (random * (max - min))
case (min: Long, max: Long) => min + (random * (max - min)).toLong
case (min: Int, max: Int) => min + (random * (max - min)).toInt
case (min: Float, max: Float) => min + (random * (max - min)).toFloat
case _ =>
throw new IllegalArgumentException("Cannot generate random number for " +
s"(min:type, max:type) = ($min: ${min.getClass.getName}, $max: ${max.getClass.getName})")
}
}

}

class LinearTransformationSerializer
extends StdSerializer[LinearTransformation](classOf[LinearTransformation]) {

Expand Down Expand Up @@ -186,29 +213,6 @@ class LinearTransformationSerializer

}

object LinearTransformation {

/**
* Creates a LinearTransformation that has random value for the nulls within the [minNumber,
* maxNumber] range
* @param minNumber
* @param maxNumber
* @param orderedDataType
* @param seed
* @return
*/

def apply(
minNumber: Any,
maxNumber: Any,
orderedDataType: OrderedDataType,
seed: Option[Long] = None): LinearTransformation = {
val randomNull = LinearTransformationUtils.generateRandomNumber(minNumber, maxNumber, seed)
LinearTransformation(minNumber, maxNumber, randomNull, orderedDataType)
}

}

class LinearTransformationDeserializer
extends StdDeserializer[LinearTransformation](classOf[LinearTransformation]) {

Expand Down Expand Up @@ -251,33 +255,3 @@ class LinearTransformationDeserializer
}

}

object LinearTransformationUtils {

/**
* Creates a LinearTransformationUtils object that contains useful functions that can be used
* outside of the LinearTransformation class.
* @param minNumber
* @param maxNumber
* @param orderedDataType
* @param seed
* @return
*/

def generateRandomNumber(min: Any, max: Any, seed: Option[Long]): Any = {
val r = if (seed.isDefined) new Random(seed.get) else new Random()
val random = r.nextDouble()

(min, max) match {
case (min: Double, max: Double) => min + (random * (max - min))
case (min: Long, max: Long) => min + (random * (max - min)).toLong
case (min: Int, max: Int) => min + (random * (max - min)).toInt
case (min: Float, max: Float) => min + (random * (max - min)).toFloat
case (min, max) =>
throw new IllegalArgumentException(
s"Cannot generate random number for type ${min.getClass.getName}")

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,26 +55,17 @@ case class LinearTransformer(columnName: String, dataType: QDataType) extends Tr
names = Seq(colMax, colMin),
predicates = Seq(s"max($columnName) AS $colMax", s"min($columnName) AS $colMin"))

override def makeTransformation(row: String => Any): Transformation = {
val minAux = row(colMin)
val maxAux = row(colMax)
if (minAux == null && maxAux == null) {
// If all values are null,
// we return a Transformation where null values are transformed to 0
NullToZeroTransformation
} else if (minAux == maxAux) {
// If both values are equal we return an IdentityTransformation
IdentityToZeroTransformation(minAux)
} else { // otherwise we pick the min and max
val min = getValue(minAux)
val max = getValue(maxAux)
dataType match {
case ordered: OrderedDataType =>
LinearTransformation(min, max, ordered)

}
}

override def makeTransformation(row: String => Any): Transformation = dataType match {
case ordered: OrderedDataType =>
val min = getValue(row(colMin))
val max = getValue(row(colMax))
if (min == max || min == null || max == null) {
val identityValue = if (min == null) max else min
IdentityTransformation(identityValue, ordered)
} else LinearTransformation(min, max, ordered)
case _ =>
throw new IllegalArgumentException(
s"LinearTransformer does not support dataType: $dataType")
}

override protected def transformerType: TransformerType = LinearTransformer
Expand Down
Loading

0 comments on commit f9313b1

Please sign in to comment.