Skip to content

Commit

Permalink
added greate_expectation validation layer
Browse files Browse the repository at this point in the history
  • Loading branch information
pevolution-ahmed committed Sep 8, 2022
1 parent 224999f commit 91fe219
Show file tree
Hide file tree
Showing 18 changed files with 365 additions and 34 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
# stock-market-analytics-dataops
# Stock Market Analytics DataOps Workflow

## Projects TO-DO:
- Save stocks ticker data from yahoo finance to Google BigQuery
- Create a Greate Expectation Suite and Checkpoints using the Greate Expectation package to validate and test the loaded data (Test suite)
- Setup A dbt-core project as a transformation layer above the source data

Binary file modified dags/__pycache__/etl_dag.cpython-310.pyc
Binary file not shown.
50 changes: 19 additions & 31 deletions dags/etl_dag.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import os
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

from airflow.providers.google.cloud.operators.bigquery import (
BigQueryGetDatasetOperator,
BigQueryCreateEmptyDatasetOperator
)
from great_expectations_provider.operators.great_expectations import (
GreatExpectationsOperator,
)
from packages.stock_data_access.stock_data import _save_stocks_data_to_gbq
import os

DATASET_NAME = 'stocks_analytics'


DATASET_NAME = 'stocks_storage'
sa_path = os.environ.get('SERVICE_ACCOUNT_JSON_PATH')
GE_ROOT_DIR = os.getcwd() + "/great_expectations"

with DAG(
"stocks-etl",
Expand All @@ -24,32 +27,17 @@
# if it's exists check if the table exists or not using a branch operator again:
# if it's not exists create the table then che
# if it's exists

get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME)
On_previous_faild_task = BashOperator(
task_id = "On_previous_faild_task",
bash_command="echo 'BigQueryGetDatasetOperator is Faild therefore this Task is running'",
trigger_rule='all_failed'
)
On_previous_success_task = BashOperator(
task_id = "On_previous_success_task",
bash_command="echo 'BigQueryGetDatasetOperator is passed successfully!!'",
)
create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME)
retry_get_dataset = BigQueryGetDatasetOperator(
task_id="retry-get-dataset",
dataset_id=DATASET_NAME,
trigger_rule='one_success'
)
get_dataset_result = BashOperator(
task_id="get_dataset_result",
bash_command="echo \"{{ task_instance.xcom_pull('get-dataset')['id'] }}\"",
upload_stocks_data_to_gbq = PythonOperator(
task_id = "save_stocks_data_to_gbq",
python_callable=_save_stocks_data_to_gbq,
op_args=['AMZN',2022,1,1,2022,2,1,sa_path]
)
get_retry_dataset_result = BashOperator(
task_id="get_retry_dataset_result",
bash_command="echo \"{{ task_instance.xcom_pull('retry-get-dataset')['id'] }}\"",
validate_stocks_data = GreatExpectationsOperator(
task_id="validate_stocks_data",
checkpoint_name="stocks_expectations",
data_context_root_dir=GE_ROOT_DIR,
fail_task_on_validation_failure=True,
return_json_dict=True
)
# Main Stream
get_dataset >> On_previous_faild_task >> create_dataset >> retry_get_dataset >> get_retry_dataset_result
get_dataset >> On_previous_success_task >> get_dataset_result

upload_stocks_data_to_gbq >> validate_stocks_data
2 changes: 0 additions & 2 deletions dags/packages/stock_data_access.py/stock_data.py

This file was deleted.

File renamed without changes.
Binary file not shown.
Binary file not shown.
18 changes: 18 additions & 0 deletions dags/packages/stock_data_access/stock_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from pandas_datareader import data as pd_reader
import datetime as dt
from pandas import DataFrame
from google.oauth2 import service_account

def _save_stocks_data_to_gbq(ticker, syear, smonth, sday, eyear, emonth, eday,sa_path):
start = dt.datetime(syear, smonth, sday)
end = dt.datetime(eyear, emonth, eday)
df = pd_reader.DataReader(ticker, 'yahoo', start, end)
credentials = service_account.Credentials.from_service_account_file(sa_path)
df.rename(columns = {'Adj Close':'Adj_Close'}, inplace = True)
df.reset_index(inplace=True)
df.to_gbq(
destination_table='stocks_storage.stocks_data',
project_id='stocks-analytics-361812',
credentials=credentials,
if_exists= 'replace'
)
1 change: 1 addition & 0 deletions great_expectations/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
uncommitted/
32 changes: 32 additions & 0 deletions great_expectations/checkpoints/stocks_expectations.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: stocks_expectations
config_version: 1.0
template_name:
module_name: great_expectations.checkpoint
class_name: Checkpoint
run_name_template: '%Y%m%d-%H%M%S-my-run-name-template'
expectation_suite_name:
batch_request: {}
action_list:
- name: store_validation_result
action:
class_name: StoreValidationResultAction
- name: store_evaluation_params
action:
class_name: StoreEvaluationParametersAction
- name: update_data_docs
action:
class_name: UpdateDataDocsAction
site_names: []
evaluation_parameters: {}
runtime_configuration: {}
validations:
- batch_request:
datasource_name: dwh
data_connector_name: default_inferred_data_connector_name
data_asset_name: stocks_storage.stocks_data
data_connector_query:
index: -1
expectation_suite_name: Stocks Expectations
profilers: []
ge_cloud_id:
expectation_suite_ge_cloud_id:
1 change: 1 addition & 0 deletions great_expectations/expectations/.ge_store_backend_id
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
store_backend_id = 02573b09-194f-4b2d-a2f4-108fb8fda974
9 changes: 9 additions & 0 deletions great_expectations/expectations/2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"data_asset_type": null,
"expectation_suite_name": "2",
"expectations": [],
"ge_cloud_id": null,
"meta": {
"great_expectations_version": "0.15.21"
}
}
21 changes: 21 additions & 0 deletions great_expectations/expectations/Expectations for stocks.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"data_asset_type": null,
"expectation_suite_name": "Expectations for stocks",
"expectations": [],
"ge_cloud_id": null,
"meta": {
"citations": [
{
"batch_request": {
"data_asset_name": "stocks.stocks-data",
"data_connector_name": "default_inferred_data_connector_name",
"datasource_name": "dwh",
"limit": 1000
},
"citation_date": "2022-09-07T14:52:41.144021Z",
"comment": "Created suite added via CLI"
}
],
"great_expectations_version": "0.15.21"
}
}
21 changes: 21 additions & 0 deletions great_expectations/expectations/Expectations of Stocks Data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"data_asset_type": null,
"expectation_suite_name": "Expectations of Stocks Data",
"expectations": [],
"ge_cloud_id": null,
"meta": {
"citations": [
{
"batch_request": {
"data_asset_name": "stocks_storage.stocks_data",
"data_connector_name": "default_inferred_data_connector_name",
"datasource_name": "dwh",
"limit": 1000
},
"citation_date": "2022-09-07T15:26:06.033289Z",
"comment": "Created suite added via CLI"
}
],
"great_expectations_version": "0.15.21"
}
}
9 changes: 9 additions & 0 deletions great_expectations/expectations/Stocks Data Expectations.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"data_asset_type": null,
"expectation_suite_name": "Stocks Data Expectations",
"expectations": [],
"ge_cloud_id": null,
"meta": {
"great_expectations_version": "0.15.21"
}
}
99 changes: 99 additions & 0 deletions great_expectations/expectations/Stocks Expectations.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
{
"data_asset_type": null,
"expectation_suite_name": "Stocks Expectations",
"expectations": [
{
"expectation_type": "expect_table_column_count_to_equal",
"kwargs": {
"value": 7
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_be_unique",
"kwargs": {
"column": "Date"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "Date"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_be_of_type",
"kwargs": {
"column": "Date",
"type_": "TIMESTAMP"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_be_of_type",
"kwargs": {
"column": "High",
"type_": "FLOAT"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_be_of_type",
"kwargs": {
"column": "Low",
"type_": "FLOAT"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_be_of_type",
"kwargs": {
"column": "Open",
"type_": "FLOAT"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_be_of_type",
"kwargs": {
"column": "Close",
"type_": "FLOAT"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_be_of_type",
"kwargs": {
"column": "Adj_Close",
"type_": "FLOAT"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_be_of_type",
"kwargs": {
"column": "Volume",
"type_": "INTEGER"
},
"meta": {}
}
],
"ge_cloud_id": null,
"meta": {
"citations": [
{
"batch_request": {
"data_asset_name": "stocks_storage.stocks_data",
"data_connector_name": "default_inferred_data_connector_name",
"datasource_name": "dwh",
"limit": 1000
},
"citation_date": "2022-09-07T15:50:23.555651Z",
"comment": "Created suite added via CLI"
}
],
"great_expectations_version": "0.15.21"
}
}
Loading

0 comments on commit 91fe219

Please sign in to comment.