Skip to content
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ This check sums a column in all rows. If the sum applied to the `column` doesn't
| `maxValue` | NumericType | The upper bound of the sum. Type depends on the type of the `column`. |
| `inclusive` | Boolean | Include `minValue` and `maxValue` as part of the range. |

**Note:** If bounds are non-inclusive, and the actual sum is equal to one of the bounds, the relative error percentage will be undefined.

## Example Config

```yaml
Expand Down
6 changes: 6 additions & 0 deletions src/main/scala/com/target/data_validator/JsonEncoders.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ object JsonEncoders extends LazyLogging {
("count", Json.fromLong(vce.count)),
("errorCount", Json.fromLong(vce.errorCount))
)
case cbvce: ColumnBasedValidatorCheckEvent => Json.obj(
("type", Json.fromString("columnBasedCheckEvent")),
("failed", Json.fromBoolean(cbvce.failed)),
("message", Json.fromString(cbvce.msg)),
("data", Json.fromFields(cbvce.data.map(x => (x._1, Json.fromString(x._2)))))
)
case qce: ValidatorQuickCheckError => Json.obj(
("type", Json.fromString("quickCheckError")),
("failed", Json.fromBoolean(qce.failed)),
Expand Down
12 changes: 12 additions & 0 deletions src/main/scala/com/target/data_validator/ValidatorEvent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ case class ValidatorCheckEvent(failure: Boolean, label: String, count: Long, err
}
}

case class ColumnBasedValidatorCheckEvent(
failure: Boolean,
data: Map[String, String],
msg: String
) extends ValidatorEvent {
override def failed: Boolean = failure

override def toHTML: Text.all.Tag = {
div(cls:="checkEvent")(failedHTML, s" - $msg")
}
}

class ValidatorTimer(val label: String) extends ValidatorEvent {
var duration = 0L

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.target.data_validator.validator

import com.target.data_validator.{ValidatorCheckEvent, ValidatorCounter, ValidatorError, VarSubstitution}
import com.target.data_validator.{ColumnBasedValidatorCheckEvent, ValidatorCounter, ValidatorError, VarSubstitution}
import com.target.data_validator.JsonEncoders.eventEncoder
import io.circe.Json
import io.circe.syntax._
Expand All @@ -10,11 +10,29 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.aggregate.Max
import org.apache.spark.sql.types._

import scala.collection.immutable.ListMap
import scala.math.abs

abstract class ColumnBased(column: String, condTest: Expression) extends CheapCheck {
override def select(schema: StructType, dict: VarSubstitution): Expression = condTest

// ColumnBased checks don't have per row error details.
def hasQuickErrorDetails: Boolean = false

// calculates and returns the pct error as a string
def calculatePctError(expected: Double, actual: Double, formatStr: String = "%4.2f%%"): String = {

if (expected == actual) {
formatStr.format(0.00) // if expected == actual, error % should be 0, even if expected is 0
}
else if (expected == 0.0) {
"undefined"
}
else {
val pct = abs(((expected - actual) * 100.0) / expected)
formatStr.format(pct)
}
}
}

case class MinNumRows(minNumRows: Long) extends ColumnBased("", ValidatorBase.L0) {
Expand All @@ -36,8 +54,11 @@ case class MinNumRows(minNumRows: Long) extends ColumnBased("", ValidatorBase.L0

override def quickCheck(row: Row, count: Long, idx: Int): Boolean = {
failed = count < minNumRows
val pctError = if (failed) calculatePctError(minNumRows, count) else "0.00%"
addEvent(ValidatorCounter("rowCount", count))
addEvent(ValidatorCheckEvent(failed, s"MinNumRowCheck $minNumRows ", count, 1))
val msg = s"MinNumRowsCheck Expected: $minNumRows Actual: $count Relative Error: $pctError"
val data = ListMap("expected" -> minNumRows.toString, "actual" -> count.toString, "relative_error" -> pctError)
addEvent(ColumnBasedValidatorCheckEvent(failed, data, msg))
failed
}

Expand Down Expand Up @@ -66,34 +87,58 @@ case class ColumnMaxCheck(column: String, value: Json)
val dataType = row.schema(idx).dataType
val rMax = row(idx)
logger.info(s"rMax: $rMax colType: $dataType value: $value valueClass: ${value.getClass.getCanonicalName}")
val num = value.asNumber
failed = dataType match {
case StringType => value.asString.exists(_ != row.getString(idx))
case ByteType => num.map(_.toByte).exists(_.get != row.getByte(idx))
case ShortType => num.map(_.toShort).exists(_.get != row.getShort(idx))
case IntegerType =>
val intNum = value.asNumber.map(_.toInt.get).getOrElse(-1)
val rowInt = row.getInt(idx)
logger.debug(s"intNum[${intNum.getClass.getCanonicalName}]: $intNum " +
s"rowInt[${rowInt.getClass.getCanonicalName}]: $rowInt")
num.map(_.toInt).exists(_.get != row.getInt(idx))
case LongType => num.map(_.toLong).exists(_.get != row.getLong(idx))
case FloatType => num.forall(_.toDouble != row.getFloat(idx))
case DoubleType => num.forall(_.toDouble != row.getDouble(idx))
case ut =>
logger.error(s"quickCheck for type: $ut, Row: $row not Implemented! Please file this as a bug.")
true // Fail check!

def resultForString: (ListMap[String, String], String) = {
val (expected, actual) = (value.asString.getOrElse(""), row.getString(idx))

failed = expected != actual
val data = ListMap("expected" -> expected, "actual" -> actual)
val errorMsg = s"ColumnMaxCheck $column[StringType]: Expected: $expected Actual: $actual"

(data, errorMsg)
}

def resultForNumeric: (ListMap[String, String], String) = {
val num = value.asNumber.get
var cmp_params = (0.0, 0.0) // (expected, actual)

dataType match {
case ByteType => cmp_params = (num.toByte.getOrElse[Byte](-1), row.getByte(idx))
case ShortType => cmp_params = (num.toShort.getOrElse[Short](-1), row.getShort(idx))
case IntegerType => cmp_params = (num.toInt.getOrElse[Int](-1), row.getInt(idx))
case LongType => cmp_params = (num.toLong.getOrElse[Long](-1), row.getLong(idx))
case FloatType => cmp_params = (num.toDouble, row.getFloat(idx))
case DoubleType => cmp_params = (num.toDouble, row.getDouble(idx))
}

failed = cmp_params._1 != cmp_params._2
val pctError = if (failed) calculatePctError(cmp_params._1, cmp_params._2) else "0.00%"
val data = ListMap("expected" -> num.toString, "actual" -> rMax.toString, "relative_error" -> pctError)
val errorMsg = s"ColumnMaxCheck $column[$dataType]: Expected: $num Actual: $rMax Relative Error: $pctError"

(data, errorMsg)
}

def resultForOther: (ListMap[String, String], String) = {
logger.error(
s"""ColumnMaxCheck for type: $dataType, Row: $row not implemented!
|Please open a bug report on the data-validator issue tracker.""".stripMargin
)
failed = true
val errorMsg = s"ColumnMaxCheck is not supported for data type $dataType"

(ListMap.empty[String, String], errorMsg)
}

val (data, errorMsg) = dataType match {
case StringType => resultForString
case _: NumericType => resultForNumeric
case _ => resultForOther
}

logger.debug(s"MaxValue compared Row: $row with value: $value failed: $failed")
if (failed) {
addEvent(
ValidatorCheckEvent(
failed,
s"columnMaxCheck column[$dataType]: $column value: $value doesn't equal $rMax",
count,
1
)
)
addEvent(ColumnBasedValidatorCheckEvent(failed, data, errorMsg))
}
failed
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package com.target.data_validator.validator

import com.target.data_validator.{ValidatorCheckEvent, ValidatorError, VarSubstitution}
import com.target.data_validator.{ColumnBasedValidatorCheckEvent, JsonEncoders, ValidatorError, VarSubstitution}
import io.circe._
import io.circe.generic.semiauto._
import io.circe.syntax._
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.aggregate.Sum
import org.apache.spark.sql.types._

import scala.collection.immutable.ListMap

case class ColumnSumCheck(
column: String,
minValue: Option[Json] = None,
Expand Down Expand Up @@ -52,29 +55,64 @@ case class ColumnSumCheck(

override def quickCheck(r: Row, count: Long, idx: Int): Boolean = {

val dataType = r.schema(idx).dataType
val isInclusive = inclusiveBounds.right.get
val lowerBoundValue = lowerBound.right.get
val upperBoundValue = upperBound.right.get

def evaluate(sum: Double): Boolean = {
if (inclusiveBounds.right.get) { sum > upperBound.right.get || sum < lowerBound.right.get}
else { sum >= upperBound.right.get || sum <= lowerBound.right.get}
if (isInclusive) { sum > upperBoundValue || sum < lowerBoundValue}
else { sum >= upperBoundValue || sum <= lowerBoundValue}
}

def getPctError(sum: Double): String = {
if (sum < lowerBoundValue) {
calculatePctError(lowerBoundValue, sum)
}
else if (sum > upperBoundValue) {
calculatePctError(upperBoundValue, sum)
}
else if (!isInclusive && (sum == upperBoundValue || sum == lowerBoundValue)) {
"undefined"
}
else {
"0.00%"
}
}

failed = r.schema(idx).dataType match {
case ShortType => evaluate(r.getShort(idx))
case IntegerType => evaluate(r.getInt(idx))
case LongType => evaluate(r.getLong(idx))
case FloatType => evaluate(r.getFloat(idx))
case DoubleType => evaluate(r.getDouble(idx))
case ByteType => evaluate(r.getByte(idx))
def getData(pctError: String): ListMap[String, String] = {
((minValue, maxValue) match {
case (Some(x), Some(y)) =>
ListMap("lower_bound" -> x.asNumber.get.toString, "upper_bound" -> y.asNumber.get.toString)
case (None, Some(y)) => ListMap("upper_bound" -> y.asNumber.get.toString)
case (Some(x), None) => ListMap("lower_bound" -> x.asNumber.get.toString)
case (None, None) => throw new RuntimeException("Must define at least one of minValue or maxValue.")
}) + ("inclusive" -> isInclusive.toString, "actual" -> r(idx).toString, "relative_error" -> pctError)
}

val actualSum: Double = dataType match {
case ByteType => r.getByte(idx)
case ShortType => r.getShort(idx)
case IntegerType => r.getInt(idx)
case LongType => r.getLong(idx)
case FloatType => r.getFloat(idx)
case DoubleType => r.getDouble(idx)
case ut => throw new Exception(s"Unsupported type for $name found in schema: $ut")
}

val bounds = minValue.getOrElse("") :: maxValue.getOrElse("") :: Nil
val prettyBounds = if (inclusiveBounds.right.get) {
r.get(idx) + " in " + bounds.mkString("[", " , ", "]")
failed = evaluate(actualSum)
val pctError = getPctError(actualSum)
val data = getData(pctError)

val bounds = minValue.getOrElse(" ") :: maxValue.getOrElse("") :: Nil
val prettyBounds = if (isInclusive) {
bounds.mkString("[", ", ", "]")
} else {
r.get(idx) + " in " + bounds.mkString("(", " , ", ")")
bounds.mkString("(", ", ", ")")
}
val errorValue = if (failed) 1 else 0
addEvent(ValidatorCheckEvent(failed, s"$name on '$column': $prettyBounds", count, errorValue))

val msg = s"$name on $column[$dataType]: Expected Range: $prettyBounds Actual: ${r(idx)} Relative Error: $pctError"
addEvent(ColumnBasedValidatorCheckEvent(failed, data, msg))
failed
}

Expand Down Expand Up @@ -119,9 +157,11 @@ case class ColumnSumCheck(
}

override def toJson: Json = {
import JsonEncoders.eventEncoder
val additionalFieldsForReport = Json.fromFields(Set(
"type" -> Json.fromString("columnSumCheck"),
"failed" -> Json.fromBoolean(failed)
"failed" -> Json.fromBoolean(failed),
"events" -> getEvents.asJson
))

val base = ColumnSumCheck.encoder(this)
Expand Down
17 changes: 15 additions & 2 deletions src/test/scala/com/target/data_validator/ValidatorBaseSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.types._
import org.scalatest._

import scala.collection.immutable.ListMap
import scala.util.Random

class ValidatorBaseSpec extends FunSpec with Matchers with TestingSparkSession {
Expand Down Expand Up @@ -182,20 +183,32 @@ class ValidatorBaseSpec extends FunSpec with Matchers with TestingSparkSession {
it("quickCheck() should fail when rowCount < minNumRows") {
val dict = new VarSubstitution
val df = spark.createDataFrame(sc.parallelize(List(Row("Doug", 50), Row("Collin", 32))), schema) //scalastyle:ignore
val config = mkConfig(df, List(MinNumRows(10))) //scalastyle:ignore
val minNumRowsCheck = MinNumRows(10) // scalastyle:ignore magic.number
val config = mkConfig(df, List(minNumRowsCheck))
assert(config.quickChecks(spark, dict))
assert(config.failed)
assert(config.tables.head.failed)
assert(minNumRowsCheck.getEvents contains ColumnBasedValidatorCheckEvent(
failure = true,
ListMap("expected" -> "10", "actual" -> "2", "relative_error" -> "80.00%"),
"MinNumRowsCheck Expected: 10 Actual: 2 Relative Error: 80.00%"
))
}

it("quickCheck() should succeed when rowCount > minNumRows") {
val dict = new VarSubstitution
val df = spark.createDataFrame(sc.parallelize(List(Row("Doug", 50), Row("Collin", 32))), schema) //scalastyle:ignore
val config = mkConfig(df, List(MinNumRows(1))) //scalastyle:ignore
val minNumRowsCheck = MinNumRows(1)
val config = mkConfig(df, List(minNumRowsCheck))
assert(!config.configCheck(spark, dict))
assert(!config.quickChecks(spark, dict))
assert(!config.failed)
assert(!config.tables.exists(_.failed))
assert(minNumRowsCheck.getEvents contains ColumnBasedValidatorCheckEvent(
failure = false,
ListMap("expected" -> "1", "actual" -> "2", "relative_error" -> "0.00%"),
"MinNumRowsCheck Expected: 1 Actual: 2 Relative Error: 0.00%"
))
}

}
Expand Down
Loading