Skip to content

This framework can be used to check the data quality of a source. Source could be Hive Table, Rdbms, csv, etc

Notifications You must be signed in to change notification settings

pixipanda/qualitycheck

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

30 Commits
 
 
 
 
 
 
 
 

Repository files navigation

Quality Check

This project is implemented using spark, circe json, cats frameworks
1. This framework could be used to do data quality checks on different data sources. As of now, only Hive is supported but could be extended to any data source.
2. Supported checks: row count check, null check, unique check, distinct check. It could be extended to include other checks
3. After the checks, it builds a dataFrame report. This dataFrame could be used to compare data correctness among multiple data sources
4. Reports could be exported as json or csv or as a dataFrame.
5. Predicate push is supported. Loading data from jdbc source to spark and then running checks may be
   not efficient. So, checks queries are sent to the store. By default, checks will be run on the dataFrame
   set predicatePush = "true" to enable predicate push
   Ex: src/test/resources/qualityCheck/mysql

Config file template

src/test/resources/application.conf
Create a config file for each job in your project and put appropriate checks.
sources config parameter is an array. So you can have data quality checks for multiple tables.
qualityCheck {
  sources = [
    {
      type = "hive"
      dbName = "db1"
      tableName = "table1"
      query = "query1"
      checks {
        rowCountCheck {
          count = 0,
          relation = "gt"
        }
        nullCheck = [ "colA", "colB",  "colC", "colD"]
        uniqueChecks = [
          ["colA", "colB",  "colC", "colD"],
          ["colX", "colY",  "colZ"]
          ["colM", "colN"]
        ]
        distinctChecks = [
          {columns = ["colA"], count = 1600, relation = "ge"},
          {columns = ["colZ", "colB"], count = 1, relation = "ge"},
          {columns = ["colY"], count = 1, relation = "ge"},
          {columns = ["colM"], count = 1, relation = "ge"}
        ]
      }
    },
    {
      type = "mysql"
      dbName = "classicmodels"
      tableName = "employees"
      options {
        url = "jdbc:mysql://localhost:3306/classicmodels"
        user = "hduser"
        password = "hadoop123"
        driver = "com.mysql.cj.jdbc.Driver"
      }
      predicatePush = "true"
      checks {
        rowCountCheck {
          count = 0,
          relation = "gt"
        }
        nullCheck = ["colA", "colB",  "colC", "colD"]
        uniqueChecks = [
          ["colA", "colB",  "colC", "colD"],
          ["colX", "colY",  "colZ"]
          ["colM", "colN"]
        ]
        distinctChecks = [
          {columns = ["colA"], count = 1600, relation = "ge"},
          {columns = ["colZ", "colB"], count = 1, relation = "ge"},
          {columns = ["colY"], count = 1, relation = "ge"},
          {columns = ["colM"], count = 1, relation = "ge"}
        ]
      }
    }
  ]
}

Important Note

If query field is specified all checks will be done on the query data else all the checks will be done on the source data

Config details

sourceQualityCheck: All the checks for a source must be specified inside this keyword
type: type is used to identify a source. It could be Hive, Teradata, csv, orc etc. Currently Hive is supported
dbName: Database name
tableName: Table name
It will check if table exists. If not then throws an error and exits the program
query = Any query that you want to run on this table. Checks will be done on the data of this query


options {
        url = "jdbc:mysql://localhost:3306/classicmodels"
        user = "hduser"
        password = "hadoop123"
        driver = "com.mysql.cj.jdbc.Driver"
      }

Options is used to provide JDBC connection info of various JDBC sources

predicatePush = "true"
If true then check queries will be run on the source.
If false then data will be loaded from the source to spark and the checks will be done on the dataframe. May not be efficient

checks:
  rowCountCheck = { count = 0, relation = "gt" }
  It will compute the total row count for the given data. If the count does not match the relation it will throw an error and exits the program.
  Supported relational operators gt, ge, lt, le, and eq


  nullCheck = [ "colA", "colB",  "colC", "colD"]
  Checks whether these columns contain null. If it contains null then throws an error and exits the program.
  
  
  uniqueChecks = [
    ["colA", "colB",  "colC", "colD"],
    ["colX", "colY",  "colZ"]
    ["colM", "colN"]
  ]
  Check whether duplicate records exist for the given combination of columns. 
  If duplicate exists then throws a error and exits the program
  
  
  distinctChecks = [
    {columns = ["colA"], count = 1600, relation = "ge"},
    {columns = ["colZ", "colB"], count = 1, relation = "ge"},
    {columns = ["colY"], count = 1, relation = "ge"},
    {columns = ["colM"], count = 1, relation = "ge"}
  ]
  Checks the distinct count of all the specified columns. 
  If it does not match then throws an error and exits the program.
  relation here specifies whether the expected count must be greater than(gt) the specified count, etc
  Supported relational operators gt, ge, lt, le, eq

Class Name

com.pixipanda.qualitycheck.Main

How to run this program

Local: Run this test case to get a demo of the framework.
src/test/scala/com/pixipanda/qualitycheck/report/ReportBuilderSpec.scala
Custer:
Please do create a shell script to submit spark job in your project and possibly include that as an action in your workflow.xml
<action name="qualityCheck">
    <ssh xmlns="uri:oozie:ssh-action:0.2" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="uri:oozie:ssh-action:0.2 ">
        <host>${applicationUserName}@${hostName}</host>
        <command>${qualityCheckSparkSubmitScript}</command>
        <arg>${appName}</arg>
        <arg>${qualityCheckClassName}</arg>
        <arg>${master}</arg>
        <arg>${deployMode}</arg>
        <arg>${queueName}</arg>
        <arg>${qualityCheckConfigFile}</arg>
        <arg>${qualityCheckJarLocation}</arg>
    </ssh>
    <ok to="end"/>
    <error to="kill"/>
</action>

Output

success report on the source data

+----------------+------------+--------+--------+-----------+------+----------+----------+
|label           |statName    |columns |relation|configValue|actual|validation|jobRunDate|
+----------------+------------+--------+--------+-----------+------+----------+----------+
|testDb_testTable|rowCountStat|NA      |gt      |0          |4     |success   |2020-06-28|
|testDb_testTable|nullStat    |quantity|eq      |0          |0     |success   |2020-06-28|
|testDb_testTable|distinctStat|item    |ge      |2          |4     |success   |2020-06-28|
|testDb_testTable|uniqueStat  |price   |eq      |0          |0     |success   |2020-06-28|
|testDb_testTable|uniqueStat  |quantity|eq      |0          |0     |success   |2020-06-28|
|testDb_testTable|uniqueStat  |item    |eq      |0          |0     |success   |2020-06-28|
+----------------+------------+--------+--------+-----------+------+----------+----------+

"actual" column contains computed values for all the checks for all the sources(data stores)
This column could be used to compare multiple sources(data stores)



Failure report on the query data
Here nullCheck failed
+----------------------+------------+--------+--------+-----------+------+----------+----------+
|label                 |statName    |columns |relation|configValue|actual|validation|jobRunDate|
+----------------------+------------+--------+--------+-----------+------+----------+----------+
|testDb_testTable_query|rowCountStat|NA      |gt      |0          |4     |success   |2020-06-28|
|testDb_testTable_query|nullStat    |quantity|eq      |0          |1     |failed    |2020-06-28|
+----------------------+------------+--------+--------+-----------+------+----------+----------+

Api to get the stats

Pass the typesafe parsed config object of your config file
com.pixipanda.qualitycheck.api.StatApi.getStats(config)

About

This framework can be used to check the data quality of a source. Source could be Hive Table, Rdbms, csv, etc

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages