Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions great-expectations-wrapper/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__pycache__
venv
dist
92 changes: 92 additions & 0 deletions great-expectations-wrapper/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Great Expectations on Spark (EMR)
Great Expectations is a great tool to validate the quality of your data. To get the most out of it, we wanted Great Expectations to be part of our data pipelines.
Whenever a pipeline finishes with all of it's transformations, we want to run expectation suites related to that pipeline against the newly transformed data.
It turns out that we needed a way to run Great Expectations with a configurable data source and expectation suite(s).
For this purpose, we came up with a wrapper that allows us to pass the data source and expectation suites (amongst others).

This project shows how to create such a wrapper so that Great Expectations can be run on a Spark cluster.

The eventual goal is that we can run our Expectation Suites using the following command:
```
generate_dashboard --pipeline PIPELINE_NAME
```

## Storing expectation suites
For our use case, we only want to load the expectation suites that are related to the pipeline. To easily do this, we are going to structure the way we store the suites.
```
.
|-- cli.py
|-- great_expectations
| |-- expectations
| | |-- expectation_definitions_1.json
| | |-- expectation_definitions_2.json
| | |-- expectation_definitions_3.json
|-- suites
| |-- PIPELINE_A
| | |-- pipeline_a_suites.yml
| |-- PIPELINE_B
| | |-- pipeline_b_suites.yml
```
Where the json files are the files that are generated by Great Expectations after you save the Jupyter notebook and yml files look like the following:
```yaml
# pipeline_a_suites.yml
- suite_name: expectation_definitions_1.json
database_name: some_database
table_name: some_table_1
- suite_name: expectation_definitions_2.json
database_name: some_database
table_name: some_table_2
```

## Creating the wrapper
The wrapper is responsible for the following:

- provide the `generate_dashboard` command
- load the correct suites
- create the `DataContext`
- run the validations

We make use of Poetry and Typer to create the wrapper. Let's create a CLI script that is capable of handling our input parameters. For the sake of simplicity, there is no error handling / reporting.

## Creating and populating a table
In order to have some data to run our expectations on, we will create a simple table on EMR. First of, SSH into you cluster and run `spark-shell`.
Let's start of with creating a table:
```scala
spark.sql("CREATE DATABASE great_expectations_demo")
```

Now that the database is created, let's populate it with some data.

```scala
val df = Seq(
("Alice", "06123456789"),
("Bob", "06987654321")
).toDF("name", "phone")

df.write.option("path", "s3://bram-bucket-emr/great_expectations_demo.db").saveAsTable("great_expectations_demo.users")
```


## Packaging and installing
To package our wrapper, just run the `poetry build` command. It will provide a `.whl` file which can be used to install our wrapper on an EMR cluster.

Copy the file to the EMR cluster with SCP and install it as any other pip package with
```
pip install PATH_TO_WHEEL --user
```
Or copy the `.whl` file to an S3 bucket to which the cluster has access and install it with
```
s3pip install s3://some-s3-bucket/PATH_TO_WHEEL --user
```

## Running
Now that the table has been created from which we will read the data, and the wrapper is installed on the cluster, it's time to see it in action.
Run
```
generate_dashboard --pipeline pipeline_a --s3-bucket SOME_S3_BUCKET
```

## View results
If no S3 bucket has been provided to the `generate_dashboard` command, the output can be found on EMR itself.
Go to `/home/hadoop/.local/lib/python3.7/site-packages/great_expectations_emr/great_expectations/uncommitted/data_docs/local_site/validations/SUITE_NAME`,
where `SUITE_NAME` corresponds to the `suite_name` that is provided in `pipeline_a_suites.yml`
Empty file.
144 changes: 144 additions & 0 deletions great-expectations-wrapper/great_expectations_emr/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import os
from collections import namedtuple
from datetime import datetime
from typing import List

from great_expectations import DataContext
from great_expectations.checkpoint import LegacyCheckpoint
from pyspark.sql import SparkSession

import typer
import yaml

from great_expectations_emr.helpers import get_relative_path
from great_expectations_emr.utils import update_ge_config

SuiteInfo = namedtuple(
"SuiteInfo",
["suite_name", "database_name", "table_name"],
)


def get_suites(pipeline: str) -> List[SuiteInfo]:
"""Retrieve all the suites that are related to the provided pipeline

:param pipeline: the pipeline to retrieve the suites for
:return: A list of SuiteInfo tuples
"""
suite_location = get_relative_path(f"suites/{pipeline}/{pipeline}_suites.yml")
suites = load_suites_config(suite_location)
return suites


def load_suites_config(suites_config_path: str) -> List[SuiteInfo]:
"""Read in the yaml suites an convert them to SuiteInfo tuples

:param suites_config_path: the location of the suites
:return: a list of SuitInfo tuples
"""
with open(suites_config_path) as cp:
suites = yaml.load(cp, Loader=yaml.FullLoader)

suites = [SuiteInfo(**args) for args in suites]
return suites


def get_run_id(suite: str, time_format: str = "%Y-%m-%d.%H:%M:%S") -> str:
"""Generate a run id based on the current suite and timestamp
"""
timestamp = datetime.now().strftime(time_format)
return f"{suite}.{timestamp}"


app = typer.Typer()
DEFAULT_SPARK_HOME = "/usr/lib/spark"
DEFAULT_CONTEXT_ROOT = get_relative_path("great_expectations")
APP_NAME = "great_expectations_wrapper"


@app.command(APP_NAME)
def run(
pipeline: str = "",
context_root_dir: str = DEFAULT_CONTEXT_ROOT,
s3_bucket: str = None
):
"""Main function that holds the logic t

:param pipeline: the pipeline for which to generate dashboards
:param context_root_dir:
:param s3_bucket:
:return:
"""
# Set the SPARK_HOME env var. This is necessary in EMR 6 since it's not already set
current = os.environ.get("SPARK_HOME")
if not current:
os.environ["SPARK_HOME"] = DEFAULT_SPARK_HOME

# You probably want to check if the pipeline is passed
print(context_root_dir)
suites = get_suites(pipeline)
print("Suites have been loaded")

keep_s3_history = False
s3_prefix = "data_doc/"
update_ge_config(context_root_dir, s3_bucket, keep_s3_history, s3_prefix)

for suite in suites:
print(f"Working on suite {suite}")
result = generate_dashboard(
suite.suite_name,
suite.database_name,
suite.table_name,
app_name=APP_NAME,
context_root_dir=context_root_dir
)

print("Success!") if result else print("Failed!")


def generate_dashboard(
suite_name: str,
database_name: str,
table_name: str,
app_name: str,
spark_session: SparkSession = None,
context_root_dir: str = "great_expectations"
) -> bool:
if not spark_session:
print("Creating spark session")
spark_session = SparkSession.builder.appName(app_name) \
.enableHiveSupport() \
.getOrCreate()

# Create a DataContext for the provided suite
context = DataContext(context_root_dir)
df = spark_session.table(f"{database_name}.{table_name}")

batch_kwargs = {"dataset": df, "datasource": "spark_datasource"}

# Making use of the new checkpoints instead of the validation operator
checkpoint = LegacyCheckpoint(
name=app_name,
data_context=context,
batches=[
{
"batch_kwargs": batch_kwargs,
"expectation_suite_names": [suite_name]
}
]
)

# Run the checkpoint
results = checkpoint.run()

context.build_data_docs()

if not results["success"]:
print("No results")
return False
print("Data docs have been built")
return True


def main():
app()
82 changes: 82 additions & 0 deletions great-expectations-wrapper/great_expectations_emr/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from typing import Any, Dict

import yaml

from great_expectations_emr import s3_store


class GreatExpectationConfig:
"""GreatExpectationConfig is a helper which loads default great expectation
config and dynamically changes based on the provided input. The default
config is configured to run on local machine. However using this dynamic
config interface, it's possible to automatically change the config file on
run time.

:param path: The default config file path.
:type path: str

Usage::
>>> gce = GreatExpectationConfig("/path/to/default_ge.conf")
>>> gce.set_s3_site("bucket_name", "site_name")
>>> gce.save()
"""

def __init__(self, path: str):
self.path = path
self.config = self.__read_config__(self.path)

@staticmethod
def __read_config__(path: str) -> Dict[Any, Any]:
"""Read config file and parse it from YAML to a Python dict.

:param path: The config file's path to read.
:type path: str
"""

with open(path) as fp:
content = yaml.load(fp, Loader=yaml.FullLoader)

return content

@staticmethod
def __write_config__(data: dict, path: str) -> None:
"""Write config file in yaml format to the given path.

:param data: The data object to write.
:type data: dict
:param path: The config file's path to write data to.
:type path: str
"""

with open(path, "w") as fp:
yaml.dump(data, fp)

def set_s3_site(self, bucket: str, site_name: str = "s3_site", s3_prefix: str = None) -> None:
"""Configure the loaded config to use an S3 site to deploy data docs to.

:param bucket: The S3 bucket name.
:type bucket: str
:param site_name: The deployed data docs site name.
:type site_name: str
:param s3_prefix: S3 bucket name.
:type s3_prefix: str
"""

self.config["data_docs_sites"] = s3_store.s3_site(bucket, site_name, s3_prefix)

def set_s3_validation_store(self, bucket: str) -> None:
"""Set s3 bucket to store validation results.

:param bucket: The S3 bucket name.
:type bucket: str
"""
self.config["stores"]["validations_store"] = s3_store.validation_store(bucket)

def save(self, path: str = None) -> None:
"""Save current config object into the given path in proper format.

:param path: The path to store the config in.
:type path: str
"""

self.__write_config__(self.config, path or self.path)
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
{
"data_asset_type": "SparkDFDataset",
"expectation_suite_name": "expectations_spark_tutorial",
"expectations": [
{
"expectation_type": "expect_table_columns_to_match_ordered_list",
"kwargs": {
"column_list": [
"name",
"phone"
]
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "name"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_be_of_type",
"kwargs": {
"column": "name",
"type_": "StringType"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "phone"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_be_of_type",
"kwargs": {
"column": "phone",
"type_": "StringType"
},
"meta": {}
}
],
"meta": {
"great_expectations.__version__": "0.13.10"
}
}
Loading